TDengine TMQ 内部机制 — 与 WAL 的耦合、过滤执行、协调者

发布时间:2026/7/5 3:28:42

TDengine TMQ 内部机制 — 与 WAL 的耦合、过滤执行、协调者 分类6.数据订阅 TMQ |篇章03 TMQ 内部机制适用版本TDengine v3.xv3.3.x / v3.4.x | 最后更新2026-07-04TMQ 本质上是WAL 之上的过滤式订阅。本文深入剖析 TMQ 如何复用 WAL 数据、Topic SQL 如何被推到 VNode 端执行、Consumer Group 协调者如何工作、以及位点管理的实现细节。核心概念速查表概念说明WAL 复用TMQ 直接从 WAL 读取不额外存储Topic SQL Pushdown过滤在 VNode 端执行CoordinatorMNode 上的消费组协调者Offset Map(group, topic, vgroup) → offsetLong Poll长轮询拉取WAL RetentionWAL 保留期决定可订阅范围详细解析1. TMQ 与 WAL 的关系TMQ 数据来源 写入数据 → WAL持久化日志→ MemTable → TSDB 文件 ↑ TMQ 从这里读取 优势 - 不需要单独的消息存储 - 数据天然按写入顺序排列 - WAL 已经保证持久性 WAL 中包含 - 数据写入INSERT - Schema 变更ALTER TABLE - 子表创建/删除 - 删除/更新操作 可订阅范围 - 受 WAL_RETENTION_PERIOD 限制 - WAL 被清理 数据无法订阅 - 调大保留期 → 消费历史能力增强但占磁盘2. Topic SQL 下推执行Topic SQL 执行位置 CREATE TOPIC t_high AS SELECT * FROM meters WHERE current 100; 执行流程 Consumer.poll() ↓ Fetch Request (vgroup3, offsetN) ↓ VNode 3 收到请求 ↓ 从 WAL 读取 offset N 之后的 Record ↓ 对每个 Record 应用 SQL - 解析为内存数据结构 - 应用 WHERE 过滤 - 投影 SELECT 列 ↓ 打包过滤后数据 ↓ 返回 Consumer 关键优化 - SQL 计划缓存避免每次重 Parser - 列投影减少传输 - 谓词早期过滤3. Coordinator 角色MNode 上的 Consumer Group 协调者 职责 - 维护 Consumer Group 成员列表 - 处理 Subscribe / Unsubscribe - 触发 Rebalance - 持久化 Offset - 监控 Consumer 心跳 - 故障检测 协调消息 - JoinGroup: Consumer 加入 - SyncGroup: 获取分配方案 - Heartbeat: 保活 - CommitOffset: 提交位点 - LeaveGroup: 离开 协调者状态 - Empty: 无活跃成员 - Preparing: Rebalance 准备中 - Stable: 稳定消费 - Dead: 组已删除4. Offset 管理Offset 元数据结构 存储位置MNode 内部表 Schema group_id (varchar) topic_name (varchar) vgroup_id (int) offset (int64) commit_time (timestamp) Commit 持久化 ① Consumer 发送 Commit RPC ② MNode 写入 Offset 元数据 ③ 多副本同步 ④ 返回成功 读取 Offset ① Consumer 启动时拉取自己的 Offset ② Rebalance 后从 Offset 继续 ③ Offset 不存在 → 用 auto.offset.reset 策略5. Long Poll 机制长轮询拉取 Consumer.poll(timeout1.0) ↓ Fetch Request → VNode ↓ VNode 检查 if 有未读消息: 立即返回 else: 等待最长 timeout 期间有新数据则立即返回 超时则返回空 优势 - 减少空轮询 - 实时性好新数据即时推送 - 服务端负载可控 对比短轮询 短轮询客户端每 100ms 询问一次 → 9 成是空回 长轮询等待新数据或超时 → 几乎不浪费6. 多 VGroup 并行消费单 Consumer 处理多个 VGroup Consumer-1 被分配 VG1, VG2, VG3 内部实现 Thread / async: - 与 VG1 维持 Fetch 连接 - 与 VG2 维持 Fetch 连接 - 与 VG3 维持 Fetch 连接 并发拉取合并消息队列 Poll 返回 合并后的消息批 各 VGroup 独立 Offset 负载均衡 - 各 VGroup 数据不均时 → 处理快的优先 - 不会因为某 VG 慢导致整体卡住7. Schema 变更的传递Topic 数据流中遇到 ALTER TABLE ① 服务端 ALTER STABLE meters ADD COLUMN x ② WAL 中记录该 Schema 变更 ③ Consumer Fetch 时遇到 Schema 变更记录 ④ 返回特殊事件给 Consumer ⑤ Consumer 检测到事件 → 更新本地 Schema 缓存 ⑥ 后续数据按新 Schema 解析 应用处理 - 监听 Schema 变更事件 - 动态适应新列 - 或忽略新列继续处理已知列8. 重启与恢复Consumer 重启场景 ① 关闭 Consumer 时 - 发送 LeaveGroup - 触发 Rebalance - 其他 Consumer 接管分区 ② 重启 Consumer - 重新 JoinGroup - 触发 Rebalance - 从 Committed Offset 继续 ③ Consumer 崩溃无 LeaveGroup - 心跳超时默认 30s - MNode 标记失联 - 触发 Rebalance - 其他成员接管 会话超时 vs 重平衡耗时 - session.timeout.ms 过小 → 误判失联频繁 Rebalance - 过大 → 故障感知慢 - 推荐 30~60 秒代码示例查看 TMQ 内部状态-- 所有订阅SELECT*FROMinformation_schema.ins_subscriptions;-- Consumer 详情SELECT*FROMperformance_schema.perf_consumers;-- WAL 保留期SELECTname,wal_retention_period,wal_retention_sizeFROMinformation_schema.ins_databases;配置 WAL 保留-- 创建数据库时配置保留 30 天 WALCREATEDATABASEdb WAL_RETENTION_PERIOD2592000WAL_RETENTION_SIZE10000;-- 修改现有数据库ALTERDATABASEdb WAL_RETENTION_PERIOD2592000;Rebalance 日志监控# taosd.log 中查找greprebalance/var/log/taos/taosdlog.0grepconsumer.*joined/var/log/taos/taosdlog.0grepconsumer.*left/var/log/taos/taosdlog.0性能考量TMQ 性能特点维度表现写入侧无额外开销WAL 本就要写Fetch 侧受 WAL 读速度限制过滤计算取决于 SQL 复杂度元数据Commit 频率影响 MNode 负载影响 TMQ 性能的关键因素影响Topic SQL 复杂度高WAL 保留期影响磁盘IOConsumer 数与并行度直接相关批量大小摊薄 RPC 开销FAQQ1: TMQ 性能上限单 VGroup 消费可达几十万行/秒。总吞吐与 VGroup 数线性扩展。Q2: Topic SQL 能用所有 SQL 语法吗主要支持SELECT 投影、WHERE 过滤。不支持 JOIN、聚合、窗口、子查询。复杂分析推荐用流计算。Q3: 删除 Topic 影响 Consumer 吗Consumer 的 Poll 会失败。需要订阅其他 Topic 或关闭。Q4: Consumer Group 怎么管理SHOWCONSUMERS;SHOWSUBSCRIPTIONS;DROPCONSUMERGROUPgroupONtopic;Q5: TMQ 和流计算的关系流计算可以订阅 Topic 作为输入流计算的输出可以再创建 Topic 给下游消费。两者形成完整数据流水线。参考系统构架篇01-《TDengine 整体架构全景》02-《集群拓扑深度解析》03-《MNode 内部机制深度解析》04-《RPC 通信层深度解析》05-《VNode 生命周期》06-《RAFT 共识协议》07-《端到端的消息流》数据模型01-《数据库创建与参数详解》02-《超级表/子表/普通表》03-《支持数据类型深度解析》04-《TDengine Tag 设计哲学与 Schema 变更机制》05-《TDengine 虚拟表实现原理》存储引擎01-《TDengine 存储引擎概览》02-《TDengine MemTable 深度解析》03-《TDengine WAL 预写日志机制》04-《TDengine 数据文件格式》05-《TDengine Commit 与 Flush 机制 》06-《TDengine Compaction 合并策略 》07-《TDengine 数据保留与 TTL》08-《TDengine 压缩编码机制》09-《TDengine Cache 与 Last 查询加速》10-《TDengine 逻辑计划生成》查询引擎01-《TDengine 查询引擎概览》02-《TDengine SQL 解析与词法分析》03-《TDengine 语义分析与 AST 重写》04-《TDengine 逻辑计划生成》05-《TDengine 物理计划生成》06-《TDengine 扫描算子》07-《TDengine 聚合算子》08-《TDengine 聚合算子》09-《TDengine 连接算子》10-《TDengine 排序、填充与投影》11-《TDengine 分布式查询执行》12-《TDengine EXPLAIN 与查询优化》数据写入01-《TDengine SQL INSERT》02-《TDengine 无模式写入》03-《TDengine STMT 写入》04-《TDengine 写入内部流程》05-《TDengine 数据更新删除》数据订阅01-《TDengine 数据订阅》02-《TDengine 订阅 vs Kafka》03-《TDengine TMQ 消费流程》关于 TDengineTDengine 专为物联网IoT平台、工业大数据平台设计。其中TDengine TSDB 是一款高性能、分布式的时序数据库Time Series Database同时它还带有内建的缓存、流式计算、数据订阅等系统功能TDengine IDMP 是一款AI原生工业数据管理平台它通过树状层次结构建立数据目录对数据进行标准化、情景化并通过 AI 提供实时分析、可视化、事件管理与报警等功能。

相关新闻