
上一篇【第47篇】ClickHouse数据分片——集群配置与分布式DDL下一篇【第49篇】ClickHouse分布式集群最佳实践与性能调优摘要本文是《Clickhouse从入门到精通》系列博客的第48篇文章深入剖析ClickHouse Distributed表引擎的内部工作原理。Distributed引擎是ClickHouse分布式架构的核心组件它作为数据分片的路由层本身不存储任何数据而是通过定义分片规则和路由策略将读写请求透明地分发到集群中的各个分片节点。本文将详细讲解Distributed引擎的定义语法与参数含义、分布式写入的完整流程含同步/异步模式、分布式查询的两阶段执行机制、IN vs GLOBAL IN以及JOIN vs GLOBAL JOIN的深度对比与性能分析、Distributed引擎的监控指标与常见问题排查方法。文章包含大量实战SQL示例和性能对比测试帮助读者深入理解ClickHouse分布式查询的执行原理与优化技巧。关键词ClickHouse、Distributed引擎、分布式写入、两阶段聚合、GLOBAL JOIN、数据路由1. 引言在ClickHouse的分布式架构中本地表Local Table和分布式表Distributed Table是两个核心概念理解它们的区别是掌握ClickHouse分布式机制的前提。本地表实际存储数据的表通常使用MergeTree系列引擎如ReplicatedMergeTree。每个分片只存储属于该分片的数据子集。分布式表使用Distributed引擎的逻辑表本身不存储数据而是作为数据分片的路由层。对分布式表的读写操作会被自动转发到对应的分片本地表上执行。可以把这种架构类比为本地表是仓库分布式表是调度中心。所有的货物数据实际存放在各个仓库中而调度中心负责决定每件货物应该送往哪个仓库以及从哪些仓库调取货物。Distributed引擎的设计哲学是轻量级路由——它尽可能地减少中间环节的开销让数据直接在客户端和分片之间流动从而实现高效的分布式读写。2. Distributed表定义详解2.1 语法与参数ENGINEDistributed(cluster,database,table,sharding_key)参数类型必填说明clusterString是集群名称对应config.xml中remote_servers的定义databaseString是远端分片上的数据库名tableString是远端分片上的本地表名sharding_keyExpression否分片键表达式决定数据写入哪个分片示例-- 最基本的Distributed表定义CREATETABLEuser_eventsONCLUSTERonline_clusterENGINEDistributed(online_cluster,default,user_events_local,cityHash64(user_id));2.2 参数详解cluster参数cluster参数指定了数据分布的目标集群。ClickHouse会根据system.clusters中该集群的定义获取所有分片的信息。-- 查看集群的分片拓扑SELECTcluster,shard_num,host_name,portFROMsystem.clustersWHEREclusteronline_clusterORDERBYshard_num;database和table参数这两个参数指定了远端分片上的本地表位置。注意所有分片上的本地表必须具有相同的库名和表名如果不同分片上的本地表名不同需要创建多个Distributed表-- 错误示例分片1的本地表叫 user_events_01分片2的叫 user_events_02-- 这种情况下无法用单个Distributed表覆盖需要分别定义-- 正确做法所有分片使用相同的本地表名-- 分片1: default.user_events_local-- 分片2: default.user_events_local-- 分片3: default.user_events_localsharding_key参数核心sharding_key是Distributed引擎最重要的参数它决定了数据如何路由到各个分片。分片键的计算逻辑shard_index hash(sharding_key) % number_of_shards 1常见分片键表达式-- 1. 随机分片数据随机分布到各分片Distributed(cluster,db,local_table,rand())-- 2. 哈希分片相同user_id的数据总是在同一分片Distributed(cluster,db,local_table,cityHash64(user_id))-- 3. 取模分片简单的取模分布Distributed(cluster,db,local_table,user_id%3)-- 4. 复合分片键多列组合Distributed(cluster,db,local_table,cityHash64(toString(user_id)||event_type))-- 5. 加权分片结合shard_weight配置-- 如果集群配置了shard_weightClickHouse会考虑权重进行分片不带sharding_key的Distributed表如果省略sharding_key参数Distributed表将只能用于查询不能用于写入。-- 只用于查询的Distributed表无sharding_keyCREATETABLEuser_events_readonlyENGINEDistributed(online_cluster,default,user_events_local);-- 可以查询SELECTcount()FROMuser_events_readonly;-- 写入会报错No sharding keyINSERTINTOuser_events_readonly...;-- ERROR3. 分布式写入流程深度解析3.1 写入流程概览当客户端向Distributed表写入数据时ClickHouse的执行流程如下客户端 │ │ INSERT INTO distributed_table VALUES (...) ▼ 执行节点接收写入的节点 │ │ Step 1: 接收数据 │ Step 2: 按sharding_key计算每行数据的目标分片 │ Step 3: 将数据按目标分片分组 │ ├──► 本分片数据 ──────────────────► 直接写入本地表 │ ├──► 分片1的数据 ──► 网络传输 ──► 分片1的本地表 │ ├──► 分片2的数据 ──► 网络传输 ──► 分片2的本地表 │ └──► 分片N的数据 ──► 网络传输 ──► 分片N的本地表3.2 详细步骤解析Step 1接收数据客户端通过TCP协议端口9000或HTTP协议端口8123向Distributed表发起写入请求。接收请求的节点称为协调节点Coordinator Node。Step 2计算分片路由协调节点对批次中的每一行数据计算其sharding_key的值然后根据集群配置确定目标分片。// ClickHouse源码中的分片路由逻辑简化版size_t shard_index0;if(sharding_key_is_set){UInt64 hashapply_sharding_key(row,sharding_key_expr);shard_indexhash%cluster.shard_count();}Step 3数据分组与转发协调节点将数据集按照目标分片进行分组然后本分片的数据直接写入本地表避免网络传输其他分片的数据暂时缓存在本地磁盘的data/[database]/[table]/.shard/目录下然后由后台线程异步转发写入方式同步 vs 异步Distributed表的写入行为由insert_distributed_sync参数控制模式配置值行为优点缺点异步写入默认insert_distributed_sync0数据先缓存在本地后台线程异步转发写入延迟低客户端快速返回可能丢数据节点崩溃时同步写入insert_distributed_sync1数据立即发送到所有目标分片全部写入成功才返回数据不丢失强一致写入延迟高受最慢分片影响配置方式-- 会话级别设置SETinsert_distributed_sync1;-- 全局配置config.xmldistributed_product_modedeny/distributed_product_modeinsert_distributed_sync1/insert_distributed_sync3.3 异步写入的底层机制当使用默认的异步写入模式时Distributed表的处理流程更加复杂客户端写入 │ ▼ 协调节点 │ │ 1. 按sharding_key分组 │ ├──► 本地分片数据 ──────► 立即写入本地表 │ └──► 远程分片数据 │ │ 2. 写入临时目录 │ /data/default/dist_table/.shard/cluster_shard_1/ │ ▼ 后台线程StorageDistributed::flushAllTables │ │ 3. 读取临时目录发送到目标分片 │ │ 4. 发送成功后删除临时文件 │ ▼ 目标分片本地表关键配置参数参数默认值说明distributed_directory_monitor_sleep_time_ms100后台线程检查间隔毫秒distributed_directory_monitor_max_sleep_time_ms30000最大检查间隔毫秒max_distributed_connections1024最大分布式连接数prefer_localhost_replica1优先写入本地副本3.4 写入失败重试机制当异步写入过程中目标分片不可用时数据会保留在.shard/临时目录中后台线程会定期重试发送如果目标分片长时间不可用临时目录会不断积累数据监控写入积压SELECTtable,count()ASpending_files,sum(bytes_on_disk)ASpending_bytesFROMsystem.distribution_queueGROUPBYtable;清理积压数据谨慎操作# 查看积压目录ls-la/var/lib/clickhouse/data/default/user_events/.shard/# 如果确认目标分片已无法恢复可以手动清理# 但这样做会导致数据丢失4. 分布式查询流程深度解析4.1 查询流程概览与写入不同分布式查询的流程更加复杂涉及多分片并行执行和结果汇总。客户端 │ │ SELECT ... FROM distributed_table WHERE ... ▼ 协调节点接收查询的节点 │ │ Step 1: 解析查询确定需要访问哪些分片 │ Step 2: 将查询发送到所有相关分片 │ ├──► 分片1执行子查询返回中间结果 ├──► 分片2执行子查询返回中间结果 ├──► 分片3执行子查询返回中间结果 │ ▼ 协调节点 │ │ Step 3: 接收各分片的中间结果 │ Step 4: 合并结果排序、聚合、Limit等 │ ▼ 返回最终结果给客户端4.2 两阶段聚合Two-Stage AggregationClickHouse分布式查询的核心优化是两阶段聚合它极大地减少了网络传输的数据量。传统方式无两阶段聚合分片1: 扫描1亿行计算完整聚合 → 传输1亿行中间结果 分片2: 扫描1亿行计算完整聚合 → 传输1亿行中间结果 分片3: 扫描1亿行计算完整聚合 → 传输1亿行中间结果 协调节点: 合并3亿行中间结果 → 最终聚合→ 网络传输量巨大性能极差ClickHouse两阶段聚合方式分片1: 扫描1亿行 → 本地预聚合减少99%→ 传输100万行中间结果 分片2: 扫描1亿行 → 本地预聚合减少99%→ 传输100万行中间结果 分片3: 扫描1亿行 → 本地预聚合减少99%→ 传输100万行中间结果 协调节点: 合并300万行中间结果 → 最终聚合→ 网络传输量减少99%性能提升显著两阶段聚合的SQL体现-- 原始查询SELECTcount(),avg(price)FROMdistributed_ordersWHEREorder_date2026-01-01;-- ClickHouse实际执行的逻辑概念上-- Stage 1 各分片SELECTcount()ASpartial_count,avg(price)ASpartial_avg,...FROMlocal_ordersWHEREorder_date2026-01-01GROUPBY...-- 如果有GROUP BY这里会先聚合-- Stage 2 协调节点SELECTsum(partial_count),avgMerge(partial_avg),...FROM(-- 各分片返回的中间结果)4.3 查询类型与优化全分片查询当查询条件不包含分片键或者使用了聚合函数时ClickHouse需要将查询发送到所有分片。-- 全分片查询需要聚合所有分片的数据SELECTcount()FROMdistributed_user_events;-- 执行所有分片分别执行 COUNT()协调节点求和SELECTevent_type,count()FROMdistributed_user_eventsGROUPBYevent_type;-- 执行各分片执行局部GROUP BY协调节点执行最终GROUP BY分片剪枝Pruning当查询条件包含分片键且分片键的计算结果是确定的ClickHouse可以只查询部分分片。-- 假设按 cityHash64(user_id) 分片-- 以下查询可以剪枝到特定分片理论上实际取决于优化器SELECT*FROMdistributed_user_eventsWHEREuser_id12345;-- ClickHouse会计算 cityHash64(12345) % shard_count-- 然后只向目标分片发送查询注意分片剪枝在ClickHouse中是有限支持的。目前主要依赖应用层的设计如已知分片键值时直接查询对应的本地表。5. IN vs GLOBAL IN 深度对比5.1 普通IN子查询的问题当在分布式表上使用IN子查询时会出现语义错误-- 假设数据分布-- 分片1有 user_id: 1, 3, 5-- 分片2有 user_id: 2, 4, 6-- 在分片1上执行SELECT*FROMdistributed_user_eventsWHEREuser_idIN(SELECTuser_idFROMdistributed_user_eventsWHEREevent_typepurchase);问题分片1只知道自己本地的user_id列表不知道分片2上有哪些user_id购买了商品。导致结果不完整5.2 GLOBAL IN的原理GLOBAL IN通过将子查询结果广播到所有分片解决了上述问题。SELECT*FROMdistributed_user_eventsWHEREuser_idGLOBALIN(SELECTuser_idFROMdistributed_user_eventsWHEREevent_typepurchase);执行流程协调节点 │ │ Step 1: 执行子查询收集所有分片的 user_id 列表 │ 结果集{1, 2, 3, 4, 5, 6} │ │ Step 2: 将结果集广播到所有分片 │ 分片1收到{1,2,3,4,5,6} │ 分片2收到{1,2,3,4,5,6} │ │ Step 3: 各分片用完整的结果集执行IN过滤 │ ▼ 协调节点汇总最终结果5.3 IN vs GLOBAL IN 对比特性INGLOBAL IN子查询结果范围仅协调节点本地所有分片汇总后广播结果正确性❌ 可能不完整✅ 完整网络开销低高广播整个结果集适用场景子查询表是本地表子查询表是分布式表内存消耗低高结果集需要在各节点存储5.4 性能对比测试-- 测试数据准备CREATETABLEtest_localONCLUSTERtest_cluster(id UInt64,val String)ENGINEReplicatedMergeTree(...)ORDERBYid;CREATETABLEtest_distONCLUSTERtest_clusterENGINEDistributed(test_cluster,default,test_local,id);INSERTINTOtest_distSELECTnumber,toString(number)FROMnumbers(1000000);-- 测试1IN可能结果不正确SELECTcount()FROMtest_distWHEREidIN(SELECTidFROMtest_distWHEREid%1000);-- 执行时间~200ms但结果可能不完整-- 测试2GLOBAL IN结果正确但网络开销大SELECTcount()FROMtest_distWHEREidGLOBALIN(SELECTidFROMtest_distWHEREid%1000);-- 执行时间~800ms结果正确-- 测试3优化方案——将子查询结果物化CREATETABLEfilter_idsONCLUSTERtest_cluster(id UInt64)ENGINEDistributed(test_cluster,default,filter_ids_local,id);INSERTINTOfilter_idsSELECTidFROMtest_distWHEREid%1000;SELECTcount()FROMtest_distWHEREidIN(SELECTidFROMfilter_ids);-- 执行时间~300ms结果正确性能介于IN和GLOBAL IN之间6. JOIN vs GLOBAL JOIN 深度对比6.1 普通JOIN的问题与IN子查询类似分布式表上的普通JOIN也会导致结果不完整。-- 事实表分布式表按user_id分片-- 维度表分布式表或本地表SELECTf.user_id,d.user_nameFROMdistributed_fact fLEFTJOINdistributed_dim dONf.user_idd.user_id;问题如果f和d按照不同的分片键分布或d是分布式表各分片上的JOIN可能丢失匹配行。6.2 GLOBAL JOIN的原理GLOBAL JOIN通过将右表数据广播到所有分片确保JOIN的完整性。SELECTf.user_id,d.user_nameFROMdistributed_fact fGLOBALLEFTJOINdistributed_dim dONf.user_idd.user_id;执行流程协调节点 │ │ Step 1: 执行右表查询收集完整右表数据 │ │ Step 2: 将右表数据广播到所有分片 │ │ Step 3: 各分片用完整的右表数据执行JOIN │ ▼ 协调节点汇总最终结果6.3 JOIN vs GLOBAL JOIN 对比特性JOINGLOBAL JOIN右表数据范围仅协调节点本地所有分片汇总后广播结果正确性❌ 可能不完整✅ 完整网络开销低高右表大小限制无但结果可能不对受内存限制广播到所有节点推荐用法右表是字典表/小表右表是分布式表且无法避免6.4 分布式JOIN的最佳实践方案1小表广播GLOBAL JOIN当右表较小时 100万行使用GLOBAL JOINSELECT*FROMdistributed_big_tableGLOBALLEFTJOINsmall_dictON...方案2维度表使用字典引擎将维度表存储为字典彻底避免分布式JOIN问题-- 创建字典CREATEDICTIONARY user_dict(user_id UInt64,user_name String,user_age UInt8)PRIMARYKEYuser_id SOURCE(CLICKHOUSE(...))LAYOUT(HASHED())LIFETIME(300);-- 使用字典查询无JOINSELECTuser_id,dictGet(user_dict,user_name,user_id)ASuser_nameFROMdistributed_fact;方案3Colocate JOIN共分片JOIN如果两张表使用相同的分片键可以将JOIN下推到分片本地执行-- 表A和表B都按 user_id 分片-- 这样相同的 user_id 总是在同一个分片-- 在应用层分别查询每个分片的本地表然后合并结果-- 或者确保查询的过滤条件能路由到正确的分片7. Distributed引擎的监控7.1 system.distribution_queuesystem.distribution_queue表记录了异步写入的待发送数据SELECTtable,count()ASpending_files,formatReadableSize(sum(bytes_on_disk))ASpending_size,min(file_creation_time)ASoldest_file_timeFROMsystem.distribution_queueGROUPBYtableORDERBYpending_filesDESC;关键字段字段说明table分布式表名bytes_on_disk待发送数据大小file_creation_time文件创建时间积压时间newest_modification_time最近修改时间7.2 写入延迟监控-- 监控各分片的写入延迟SELECTshard_num,host_name,table,count()ASpending_batches,avg(dateDiff(second,file_creation_time,now()))ASavg_delay_secFROMcluster(online_cluster,system,distribution_queue)GROUPBYshard_num,host_name,tableORDERBYavg_delay_secDESC;7.3 分布式查询性能监控-- 查看最近查询的分布式执行详情SELECTquery_id,is_initial_query,elapsed,read_rows,read_bytes,result_rowsFROMsystem.query_logWHEREevent_datetoday()ANDqueryLIKE%FROM distributed_%ANDtypeQueryFinishORDERBYelapsedDESCLIMIT20;8. Distributed引擎的注意事项与限制8.1 不支持的DDL操作Distributed表本身不存储数据因此对Distributed表执行某些DDL操作是不支持或语义不同的-- ❌ 不支持ALTER TABLE on Distributed表-- 需要通过ON CLUSTER修改本地表-- ❌ 不支持OPTIMIZE TABLE on Distributed表-- 需要分别对本地表执行OPTIMIZE-- ✅ 支持TRUNCATE TABLE on Distributed表-- 会截断所有分片的本地表-- ✅ 支持DROP TABLE on Distributed表-- 只删除分布式表定义不删除本地表8.2 跨分片事务问题ClickHouse的分布式写入不支持跨分片事务。如果写入过程中部分分片成功、部分分片失败会导致数据不一致。解决方案使用异步写入 监控积压允许最终一致应用层重试写入失败时应用层进行重试需要幂等性设计使用Atmospheric模式先写本地表再通过后台任务同步8.3 数据倾斜检测-- 检测各分片的数据量是否均衡SELECTshard_num,host_name,sum(rows)AStotal_rows,round(sum(rows)*100.0/sum(sum(rows))OVER(),2)ASpctFROMcluster(online_cluster,system,parts)WHEREtableuser_events_localGROUPBYshard_num,host_nameORDERBYshard_num;如果某分片的数据量占比超过平均值的20%说明存在数据倾斜需要检查分片键的选择是否合理。9. 实战案例GLOBAL IN vs IN 性能对比9.1 测试环境集群3分片 × 2副本数据量每分片约1亿行网络万兆网卡分片间延迟 1ms9.2 测试数据-- 创建本地表CREATETABLEevents_localONCLUSTERtest_cluster(user_id UInt64,event_timeDateTime,event_type LowCardinality(String))ENGINEReplicatedMergeTree(...)PARTITIONBYtoYYYYMM(event_time)ORDERBY(user_id,event_time);-- 创建分布式表CREATETABLEeventsONCLUSTERtest_clusterENGINEDistributed(test_cluster,default,events_local,cityHash64(user_id));-- 插入测试数据1亿行/分片INSERTINTOeventsSELECTrand()%10000000ASuser_id,toDateTime(2026-01-01 00:00:00)rand()%(30*86400)ASevent_time,[click,view,purchase][rand()%31]ASevent_typeFROMnumbers(100000000);9.3 性能测试-- 测试1IN子查询结果可能不正确SELECTcount()FROMeventsWHEREuser_idIN(SELECTuser_idFROMeventsWHEREevent_typepurchase);-- 结果9,876,543不完整-- 耗时2.3秒-- 测试2GLOBAL IN结果正确SELECTcount()FROMeventsWHEREuser_idGLOBALIN(SELECTuser_idFROMeventsWHEREevent_typepurchase);-- 结果29,543,210完整-- 耗时8.7秒-- 测试3优化方案——使用物化表CREATETABLEpurchase_usersONCLUSTERtest_clusterENGINEDistributed(test_cluster,default,purchase_users_local,cityHash64(user_id));INSERTINTOpurchase_usersSELECTDISTINCTuser_idFROMeventsWHEREevent_typepurchase;SELECTcount()FROMeventsWHEREuser_idIN(SELECTuser_idFROMpurchase_users);-- 结果29,543,210完整-- 耗时3.1秒最优9.4 结论普通IN可能返回不完整结果生产环境慎用GLOBAL IN保证结果正确但网络开销大适合右表较小的场景最优方案是预计算并物化子查询结果然后用普通IN查询10. 总结与最佳实践本文深入解析了ClickHouse Distributed引擎的工作原理以下是关键要点总结写入方面优先使用异步写入insert_distributed_sync0性能更好适合大多数场景监控.shard/目录积压避免节点故障导致数据丢失写入本地表查询分布式表这是官方推荐的最佳实践避免通过Distributed表写入带来的额外网络开销查询方面避免在大结果集上使用GLOBAL IN/JOIN广播开销巨大优先使用字典替代JOINdictGet()函数性能远优于分布式JOIN利用两阶段聚合确保GROUP BY和聚合函数的性能监控方面定期检查system.distribution_queue发现写入积压监控各分片数据均衡度及时发现数据倾斜慢查询日志分析找出分布式查询的性能瓶颈架构方面Distributed表不是银弹它增加了网络开销和复杂度对于确定性查询知道分片键值时直接查询本地表性能更好考虑使用ClickHouse Keeper替代ZooKeeper减少分布式协调的延迟在下一篇文章中我们将讨论ClickHouse分布式集群的最佳实践与性能调优包括副本数与分片数的选择、ZooKeeper优化、分布式JOIN优化等高级主题。上一篇【第47篇】ClickHouse数据分片——集群配置与分布式DDL下一篇【第49篇】ClickHouse分布式集群最佳实践与性能调优