
一、引言在数据湖场景中写入操作并非简单的append不同业务场景对数据写入有截然不同的需求CDC 增量同步需要高效的 upsert 能力同时还需处理上游的 DELETE 事件日志类数据追求高吞吐的纯追加写入历史数据回填需要大批量的初始加载分区级数据重刷需要原子性的覆盖写入数据合规删除GDPR/CCPA需要精准定位并物理移除记录Hudi 通过提供多种 Write Operation 类型让用户根据场景精确控制写入行为从而在写入性能、数据一致性、存储效率之间取得最优平衡。二、Hudi Write Operations 详解Hudi 提供以下核心 Write Operation 类型与定位Write Operation是否去重是否更新索引查找主要用途UPSERT✅✅✅CDC 场景增量更新INSERT❌可选去重❌❌确认无重复的追加写入BULK_INSERT❌❌❌大批量初始加载DELETE--✅硬删除指定记录传入 key partition pathINSERT_OVERWRITE❌覆盖❌分区级数据重刷INSERT_OVERWRITE_TABLE❌覆盖❌全表数据重刷1. UPSERTUPSERT 是 Hudi 最核心的写操作也是 COW 和 MOR 表的默认写操作。其核心逻辑是先通过索引定位记录是否已存在存在则更新不存在则插入。COW 表的 UPSERT更新操作会重写整个文件Copy-On-Write即读取原始 Parquet 文件合并更新记录后写出新版本文件。写入放大较高但读取时无需合并查询性能最优。MOR 表的 UPSERT更新操作写入 Delta Log 文件不立即重写 Base 文件。写入效率高但读取时需要合并 Base Log。通过后续 Compaction 操作将 Log 合入 Base 文件。关键配置# 写操作类型 hoodie.datasource.write.operationupsert # Payload 合并策略 hoodie.datasource.write.payload.classorg.apache.hudi.common.model.OverwriteWithLatestAvroPayload # precombine 字段必须指定用于同 key 去重 hoodie.datasource.write.precombine.fieldts # 并行度控制 hoodie.upsert.shuffle.parallelism2002. INSERTINSERT 操作跳过索引查找步骤直接将数据写入新文件或追加到现有小文件中小文件合并策略。关键特点不执行索引查找因此写入性能高于 UPSERT如果数据中存在与已有数据重复的 key不会去重除非显式开启 hoodie.combine.before.insert该配置仅做批次内去重适用于能从业务层保证无重复的场景与 UPSERT 性能对比性能吞吐量: BULK_INSERT INSERT UPSERT 数据正确性保障: UPSERT INSERT BULK_INSERT3. BULK_INSERTBULK_INSERT 专为大批量初始数据加载设计绕过了索引查找和小文件分配逻辑直接利用 Spark/Flink 的排序能力对数据进行组织后写入。排序模式hoodie.bulkinsert.sort.mode模式说明适用场景GLOBAL_SORT全局排序数据分布最优数据量适中追求最优文件布局PARTITION_SORT分区内排序大数据量平衡性能与布局NONE默认不排序最快极大数据量且后续会 clusterPARTITION_PATH_REPARTITION按分区路径重分区确保分区对齐PARTITION_PATH_REPARTITION_AND_SORT重分区 排序兼顾分区对齐与排序关键配置hoodie.datasource.write.operationbulk_insert hoodie.bulkinsert.sort.modePARTITION_SORT hoodie.bulkinsert.shuffle.parallelism3004. DELETEHudi 的删除并非单一机制而是提供了一组完整的删除能力体系涵盖软删除和硬删除两大类。1.软删除软删除保留记录在存储中的存在但将所有非主键字段除 record key、partition path 和 precombine field 外置为 null。# 软删除示例保留 key 字段其余置空 soft_delete_df spark.createDataFrame( [(record_key, partition_path, precombine_val, None, None, ...)], schema ) soft_delete_df.write.format(hudi) \ .option(hoodie.datasource.write.operation, upsert) \ .option(hoodie.table.name, my_table) \ .mode(append) \ .save(path)适用场景需要在增量查询Incremental Query中感知到删除事件的下游消费者。2.硬删除硬删除会从存储中彻底移除记录经 Compaction 和 Clean 后物理删除。DELETE Operation最直接的方式传入的 DataFrame 只需包含 record key 和 partition path# 硬删除 - DELETE operation delete_df spark.createDataFrame( [(record_key, partition_path)], [id, partition] ) delete_df.write.format(hudi) \ .option(hoodie.datasource.write.operation, delete) \ .option(hoodie.datasource.write.recordkey.field, id) \ .option(hoodie.datasource.write.partitionpath.field, partition) \ .option(hoodie.table.name, my_table) \ .mode(append) \ .save(path)EmptyHoodieRecordPayload通过指定空 payload使得 UPSERT 写入时将匹配记录标记为删除hoodie.datasource.write.operationupsert hoodie.datasource.write.payload.classorg.apache.hudi.common.model.EmptyHoodieRecordPayload_hoodie_is_deleted字段在 CDC 管道中最为常用可以将 insert/update/delete 事件统一在一个 UPSERT 流中处理from pyspark.sql.functions import lit, when, col # CDC 事件流统一处理 cdc_df cdc_df.withColumn( _hoodie_is_deleted, when(col(op) d, lit(True)).otherwise(lit(False)) ) cdc_df.write.format(hudi) \ .option(hoodie.datasource.write.operation, upsert) \ .option(hoodie.table.name, my_table) \ .mode(append) \ .save(path)3.不同表类型的删除行为与选型建议不同表类型下的删除行为表类型删除行为COW重写整个 Parquet 文件输出中不包含被删除的记录MOR向 Delta Log 中写入 delete block后续 Compaction 时物理移除记录删除方式选型建议场景推荐方式原因CDC 管道中处理 DELETE 事件_hoodie_is_deleted 字段与 insert/update 统一管道无需拆分流批量精确删除指定记录DELETE operation语义清晰只需提供 keyGDPR 合规删除DELETE operation硬删除配合 Cleaner 物理移除需要下游感知删除事件Soft Delete增量查询可见删除记录整个分区废弃INSERT_OVERWRITE 空 DataFrame分区级原子删除5. INSERT_OVERWRITENSERT_OVERWRITE 以分区级别进行原子替换它为目标分区创建全新的 FileGroup然后在 Timeline 上原子性地将旧文件标记为替换。核心优势原子性要么全部成功要么全部回滚不影响未涉及的分区无需索引查找性能好自动清理旧数据通过 Cleaner可用于分区级删除写入空 DataFrame关键配置# Spark 示例分区级数据重刷 df.write.format(hudi) \ .option(hoodie.datasource.write.operation, insert_overwrite) \ .option(hoodie.datasource.write.partitionpath.field, dt) \ .option(hoodie.datasource.write.recordkey.field, id) \ .option(hoodie.table.name, my_table) \ .mode(append) \ .save(s3://bucket/my_table)6. INSERT_OVERWRITE_TABLE与 INSERT_OVERWRITE 类似但作用范围是全表。整表数据被新写入的数据完全替换。适用于全量刷新场景如每日全量快照表、维度表全量更新。三、版本差异与选型建议相较于 0.x 版本Hudi 1.x 在写入路径上有以下关键演进维度0.x 版本1.x 版本并发控制基于 OCC乐观并发非阻塞并发控制NBCC索引体系以 Bloom Index 为主Record-level Index 增强支持多种索引类型写入引擎Spark 为主多引擎统一Spark / Flink / Kafka Connect表服务与写入耦合异步表服务Async Table Services存储格式HFile / Parquet统一存储层抽象Hudi写操作场景选型决策指南四、最佳实践UPSERT 性能优化# 1. 索引选型 - 根据数据规模选择 # 小规模 10亿记录Bloom Index hoodie.index.typeBLOOM hoodie.bloom.index.filter.typeDYNAMIC_V0 # 大规模 高频更新Bucket IndexHudi 1.x 推荐 hoodie.index.typeBUCKET hoodie.index.bucket.engineCONSISTENT_HASHING hoodie.bucket.index.num.buckets256 # 2. 并行度调优 hoodie.upsert.shuffle.parallelismnum_partitions * 2~3 # 3. precombine field 必须指定 hoodie.datasource.write.precombine.fieldupdated_atBULK_INSERT 初始加载# 建议配置 hoodie.datasource.write.operationbulk_insert hoodie.bulkinsert.sort.modePARTITION_SORT # 控制文件大小避免过多小文件 hoodie.parquet.max.file.size134217728 # 128MB hoodie.parquet.small.file.limit0 # 关闭小文件合并 # 并行度 数据总量 / 目标文件大小 hoodie.bulkinsert.shuffle.parallelismtotal_data_size / 128MBMOR 表 UPSERT 的 Compaction 策略# Compaction 触发策略 hoodie.compact.inlinefalse # 生产环境关闭 inline compaction hoodie.compact.schedule.inlinetrue # 在写入时调度 compaction plan hoodie.compact.inline.max.delta.commits5 # 每5次提交触发一次 # 异步 Compaction推荐 # 通过独立作业执行 compaction避免影响写入延迟