万亿级数据迁移工程:从全量校验到增量同步的零丢失实践

发布时间:2026/6/17 22:51:34

万亿级数据迁移工程:从全量校验到增量同步的零丢失实践 万亿级数据迁移工程从全量校验到增量同步的零丢失实践一、数据迁移的恐惧不是迁不过去是迁完了数据不对做过大表迁移的人都知道最可怕的不是迁移过程中断——那可以重来。最可怕的是迁移完成后数据看起来正常但某些行丢了、某些列偏移了、某些时间戳被截断了。这种静默错误在生产环境中比迁移失败更危险因为你在用错误的数据做业务决策。万亿级数据迁移的核心挑战不是吞吐量可以加机器而是一致性验证。全量迁移 10 万亿行数据后如何确认源端和目标端的数据完全一致逐行对比不可行耗时数天哈希校验需要解决排序和类型差异问题增量同步需要保证不丢不重。本文从工程实践角度给出可落地的方案。二、数据迁移全链路与一致性保障flowchart TB A[迁移规划] -- B[全量迁移] A -- C[增量同步启动] B -- B1[分片读取br/按主键范围切片] B1 -- B2[类型转换与校验br/源端/目标端类型映射] B2 -- B3[批量写入目标端] B3 -- B4[分片级校验br/行数 哈希] C -- C1[Binlog 订阅br/从全量开始位点] C1 -- C2[事件解析与转换] C2 -- C3[幂等写入目标端] B4 -- D[全量一致性校验] C3 -- D D -- D1[行数对比br/分片级统计] D -- D2[哈希对比br/按主键分组聚合] D -- D3[抽样逐行对比br/1% 随机样本] D1 -- E{校验通过?} D2 -- E D3 -- E E --|是| F[切换流量] E --|否| G[差异修复br/定位不一致行] style D fill:#e3f2fd style D2 fill:#fff3e0 style E fill:#e8f5e9数据迁移的三阶段全量迁移按主键分片批量迁移、增量同步Binlog 订阅保证不丢、一致性校验行数 哈希 抽样逐行。关键点是增量同步必须从全量迁移的开始位点启动确保全量迁移期间的新增数据不丢失。三、代码实现与分析3.1 分片全量迁移与校验from __future__ import annotations import hashlib import struct from dataclasses import dataclass from typing import Any, Callable from concurrent.futures import ThreadPoolExecutor, as_completed dataclass class MigrationShard: 迁移分片 shard_id: int min_key: int max_key: int row_count: int 0 checksum: str status: str pending # pending / migrating / done / failed class ShardMigrator: 分片迁移器 def __init__( self, source_reader: Callable[[int, int], list[dict]], target_writer: Callable[[list[dict]], int], shard_size: int 100000, concurrency: int 8, ): self.source_reader source_reader self.target_writer target_writer self.shard_size shard_size self.concurrency concurrency def plan_shards(self, min_key: int, max_key: int) - list[MigrationShard]: 规划迁移分片 shards [] shard_id 0 current min_key while current max_key: end min(current self.shard_size - 1, max_key) shards.append(MigrationShard( shard_idshard_id, min_keycurrent, max_keyend, )) current end 1 shard_id 1 return shards def migrate_shard(self, shard: MigrationShard) - MigrationShard: 迁移单个分片 shard.status migrating try: # 读取源数据 rows self.source_reader(shard.min_key, shard.max_key) shard.row_count len(rows) if rows: # 计算源端校验和 shard.checksum self._compute_checksum(rows) # 写入目标端 written self.target_writer(rows) if written ! shard.row_count: raise RuntimeError( f写入行数不匹配: 期望 {shard.row_count}, 实际 {written} ) shard.status done except Exception as e: shard.status failed raise RuntimeError(f分片 {shard.shard_id} 迁移失败: {e}) from e return shard def migrate_all( self, min_key: int, max_key: int ) - list[MigrationShard]: 并发迁移所有分片 shards self.plan_shards(min_key, max_key) completed [] with ThreadPoolExecutor(max_workersself.concurrency) as executor: futures { executor.submit(self.migrate_shard, shard): shard for shard in shards } for future in as_completed(futures): result future.result() completed.append(result) return sorted(completed, keylambda s: s.shard_id) staticmethod def _compute_checksum(rows: list[dict]) - str: 计算行集合的校验和 hasher hashlib.md5() for row in rows: # 按列名排序确保一致性 for key in sorted(row.keys()): value row[key] if value is None: hasher.update(b\x00) elif isinstance(value, int): hasher.update(struct.pack(q, value)) elif isinstance(value, float): hasher.update(struct.pack(d, value)) elif isinstance(value, bytes): hasher.update(value) else: hasher.update(str(value).encode(utf-8)) return hasher.hexdigest()3.2 增量同步与幂等写入from enum import Enum from datetime import datetime class BinlogEventType(Enum): INSERT insert UPDATE update DELETE delete dataclass class BinlogEvent: Binlog 事件 event_type: BinlogEventType table: str primary_key: Any before: dict | None None # UPDATE/DELETE 的前镜像 after: dict | None None # INSERT/UPDATE 的后镜像 timestamp: datetime | None None binlog_position: str # 位点信息 class IncrementalSyncer: 增量同步器基于 Binlog 的 CDC def __init__( self, target_writer: Callable[[list[dict]], int], position_store: Callable[[str], None], ): self.target_writer target_writer self.position_store position_store self._last_position def process_event(self, event: BinlogEvent) - None: 处理单个 Binlog 事件幂等 if event.event_type BinlogEventType.INSERT: self._idempotent_insert(event) elif event.event_type BinlogEventType.UPDATE: self._idempotent_update(event) elif event.event_type BinlogEventType.DELETE: self._idempotent_delete(event) # 记录位点 self._last_position event.binlog_position self.position_store(event.binlog_position) def _idempotent_insert(self, event: BinlogEvent) - None: 幂等 INSERT先 DELETE 再 INSERT if event.after is None: return # 目标端先删除可能存在的旧数据重复消费场景 self._delete_by_key(event.table, event.primary_key) self.target_writer([event.after]) def _idempotent_update(self, event: BinlogEvent) - None: 幂等 UPDATE基于主键的全量替换 if event.after is None: return # 使用 REPLACE INTO 或 DELETE INSERT 保证幂等 self._delete_by_key(event.table, event.primary_key) self.target_writer([event.after]) def _idempotent_delete(self, event: BinlogEvent) - None: 幂等 DELETE删除不存在也不报错 self._delete_by_key(event.table, event.primary_key) def _delete_by_key(self, table: str, primary_key: Any) - None: 按主键删除目标端数据 # 实际实现DELETE FROM target_table WHERE pk ? pass def batch_process(self, events: list[BinlogEvent]) - dict: 批量处理 Binlog 事件 results {processed: 0, failed: 0, last_position: } # 按主键去重同一行的多次变更只保留最后一次 deduped self._deduplicate(events) for event in deduped: try: self.process_event(event) results[processed] 1 except Exception as e: results[failed] 1 # 记录失败事件后续重试 print(f处理失败: {event}, 错误: {e}) results[last_position] self._last_position return results staticmethod def _deduplicate(events: list[BinlogEvent]) - list[BinlogEvent]: 按主键去重保留每个主键的最后一次事件 key_to_event: dict[tuple, BinlogEvent] {} for event in events: key (event.table, event.primary_key) key_to_event[key] event # 后出现的覆盖前面的 return list(key_to_event.values())3.3 一致性校验框架dataclass class ConsistencyReport: 一致性校验报告 total_shards: int consistent_shards: int inconsistent_shards: list[dict] row_count_diff: int checksum_mismatches: int sample_mismatches: int property def is_consistent(self) - bool: return ( self.inconsistent_shards [] and self.row_count_diff 0 and self.checksum_mismatches 0 and self.sample_mismatches 0 ) class ConsistencyValidator: 一致性校验器 def validate( self, source_shards: list[MigrationShard], target_shards: list[MigrationShard], ) - ConsistencyReport: 分片级一致性校验 inconsistent [] row_diff 0 checksum_mismatches 0 for src, tgt in zip(source_shards, target_shards): if src.row_count ! tgt.row_count: inconsistent.append({ shard_id: src.shard_id, issue: row_count_mismatch, source_count: src.row_count, target_count: tgt.row_count, }) row_diff abs(src.row_count - tgt.row_count) elif src.checksum ! tgt.checksum: inconsistent.append({ shard_id: src.shard_id, issue: checksum_mismatch, source_checksum: src.checksum, target_checksum: tgt.checksum, }) checksum_mismatches 1 return ConsistencyReport( total_shardslen(source_shards), consistent_shardslen(source_shards) - len(inconsistent), inconsistent_shardsinconsistent, row_count_diffrow_diff, checksum_mismatcheschecksum_mismatches, sample_mismatches0, # 抽样校验单独执行 ) def sample_validate( self, source_reader: Callable[[int, int], list[dict]], target_reader: Callable[[int, int], list[dict]], total_rows: int, sample_rate: float 0.01, ) - list[dict]: 抽样逐行对比 import random sample_size max(int(total_rows * sample_rate), 100) sample_keys random.sample(range(1, total_rows 1), min(sample_size, total_rows)) mismatches [] for key in sample_keys: src_rows source_reader(key, key) tgt_rows target_reader(key, key) if src_rows ! tgt_rows: mismatches.append({ primary_key: key, source: src_rows[0] if src_rows else None, target: tgt_rows[0] if tgt_rows else None, }) return mismatches四、数据迁移的边界与架构权衡全量迁移的时间窗口万亿级数据全量迁移可能需要数天。期间源端持续写入增量同步必须覆盖全量迁移期间的所有变更。关键点是记录全量迁移开始时的 Binlog 位点增量同步从这个位点开始消费。位点记录必须在全量迁移开始前完成否则可能丢失数据。类型映射的隐式风险MySQL 的DATETIME到 ClickHouse 的DateTime精度不同MySQL 支持微秒ClickHouse 秒级MySQL 的VARCHAR到 ClickHouse 的String编码可能不同MySQL 的DECIMAL到 ClickHouse 的Float64有精度损失。类型映射必须显式定义并在校验阶段验证。哈希校验的排序敏感性源端和目标端的数据如果按不同顺序计算哈希结果不同。必须确保两端的排序规则一致按主键升序且 NULL 值的处理方式一致。增量同步的断点续传增量同步进程可能因网络抖动或目标端压力而中断。位点信息必须持久化存储数据库或文件重启后从上次位点继续消费。位点提交必须是原子操作——数据写入和位点更新在同一个事务中。五、总结万亿级数据迁移的核心挑战是一致性验证而非吞吐量。本文的关键实践为按主键分片批量迁移并计算分片级校验和、增量同步从全量开始位点启动并保证幂等写入、用行数 哈希 抽样逐行三级校验确保一致性。类型映射、排序一致性和位点管理是迁移中最容易出错的环节。迁移完成后不要急于切换流量先用双读模式验证一段时间确认数据一致后再正式切换。补充落地建议围绕“万亿级数据迁移工程从全量校验到增量同步的零丢失实践”继续推进时应把验证标准写成可执行清单而不是停留在经验判断。性能类方案要给出基准数据架构类方案要给出故障隔离方式AI 类方案要给出输出质量和人工兜底策略。每一次迭代都应回答三个问题收益是否可量化失败是否可回滚维护成本是否被团队接受。如果短期资源有限可以先保留最关键的观测指标包括处理耗时、失败率、资源占用和人工介入次数。等这些指标稳定后再扩展自动化能力。这样的节奏更慢但风险更低也更符合生产级技术文章强调的工程可验证性。

相关新闻