
原文地址https://pgducklake.select/blog/pg-duckpipe-whats-new/pg_duckpipe2026年3月新特性qianzhen2026年3月28日概述pg_duckpipe 是一个 PostgreSQL 扩展它为您的数据库带来了实时变更数据捕获功能。它通过 PostgreSQL 的 WAL将您的常规堆表持续同步到 DuckLake 列式表中使您能够在单个数据库中运行事务型和分析型工作负载。无需 Kafka无需 Debezium无需外部编排器。只需要 PostgreSQL。我们一直在快速迭代。以下是近期发布的新功能透明查询路由分析型 SELECT 查询将自动重定向到列式 DuckLake 表追加同步模式具有恰好一次语义的不可变变更日志支持 SCD Type 2 和无主键表扇入流式传输将多个源数据库合并到一个分析型表中分区表支持无需额外配置即可同步分区表分层配置可在全局、组或表级别调整刷新行为模式 DDL 传播ADD、DROP 和 RENAME COLUMN 操作自动同步到 DuckLake 目标表稳定性和可观测性可溢出的刷新缓冲区、并发刷新控制以及监控指标透明查询路由pg_duckpipe 现在包含一个计划器钩子可以透明地将已同步源表上的 SELECT 查询重写为针对其 DuckLake 列式副本的查询。无需更改任何查询。[透明查询路由示意图]SETduckpipe.query_routingauto;-- 此查询将自动命中列式 DuckLake 副本SELECTcustomer_id,sum(total),count(*)FROMordersGROUPBYcustomer_id;提供三种路由模式模式行为off不进行路由默认on将所有 SELECT 路由到 DuckLakeauto路由分析型查询跳过主键查找auto模式是 HTAP 场景的最佳选择点查找留在堆表上以获得低延迟 OLTP 性能而分析型扫描则转向列式存储。可通过表级配置对每个表进行控制。追加同步模式默认的upsert模式维护源表的实时副本。新的追加模式采用了不同的方法每个变更都会成为不可变变更日志中的一行新记录本质上是在 CDC 管道中内置了 SCD Type 2 功能。SELECTduckpipe.add_table(public.events,sync_modeappend);每行都会获得_duckpipe_op操作类型I/U/D和_duckpipe_lsn元数据列。例如INSERTINTOcustomers(id,name,email)VALUES(1,Alice,aliceold.com);UPDATEcustomersSETemailalicenew.comWHEREid1;DELETEFROMcustomersWHEREid1;追加模式的 DuckLake 目标表将捕获所有版本idnameemail_duckpipe_op_duckpipe_lsn1Alicealiceold.comI104858001Alicealicenew.comU104869201Alicealicenew.comD10487104通过双层去重机制确保了恰好一次语义的正确性。即使在崩溃或重启后也不会出现重复或遗漏。追加模式还使得同步无主键的表成为可能因为变更日志不需要识别行进行更新操作SELECTduckpipe.add_table(public.raw_events,sync_modeappend);这使得追加模式非常适合审计跟踪、事件溯源和日志系统分析。扇入流式传输扇入功能允许您将多个源数据库流式传输到一个 DuckLake 目标表中。每行都会获得一个_duckpipe_source列其中填充了同步组名称因此您可以随时追溯一行数据来自哪个源。[扇入流式传输示意图]-- 将两个生产数据库的订单同步到一个分析表中SELECTduckpipe.create_group(us_prod,conninfohostus-prod.example.com ...);SELECTduckpipe.create_group(eu_prod,conninfohosteu-prod.example.com ...);SELECTduckpipe.add_table(public.orders,sync_groupus_prod);SELECTduckpipe.add_table(public.orders,sync_groupeu_prod,fan_intrue);-- 跨两个源查询SELECT_duckpipe_source,count(*),sum(total)FROMorders_ducklakeGROUPBY_duckpipe_source;_duckpipe_source列使得按源过滤时可以进行 Parquet 文件级别的裁剪并且所有变更操作DELETE、TRUNCATE、重新同步都是源范围限定的。各个源之间互不干扰因此随着源数量的增加性能保持恒定。分区表支持pg_duckpipe 现在可以自动检测分区的源表。只需添加父表所有子分区的数据就会作为统一视图出现在目标 DuckLake 表中。分层配置现在的配置是一个四级层次结构硬编码默认值、全局配置、按组覆盖、按表覆盖。最具体的设置生效。-- 设置全局默认值SELECTduckpipe.set_config(flush_interval_ms,10000);-- 为特定组覆盖设置SELECTduckpipe.set_group_config(high_throughput,flush_interval_ms,2000);-- 为特定表覆盖设置SELECTduckpipe.set_table_config(public.orders,flush_batch_threshold,50000);这使得您可以对热表进行不同于冷表的调优而无需更改全局设置。模式 DDL 传播源表上的模式更改现在会自动传播到 DuckLake 目标表。支持的操作包括ADD COLUMN新列出现在目标表中DROP COLUMN列从目标表中移除RENAME COLUMN目标表中的列名随之更新DDL 检测通过对比 WAL 流中的 RELATION 消息来实现因此无需事件触发器或外部钩子。刷新线程在应用模式更改之前会排空旧模式的数据确保每个批次都使用正确的列布局进行处理。ALTER COLUMN TYPE目前被阻止以防止现有 Parquet 文件中出现静默数据损坏。放宽类型的更改例如 INT 到 BIGINTVARCHAR(50) 到 VARCHAR(200)将在不久的将来得到支持。稳定性和可观测性几项改进使 pg_duckpipe 在生产负载下更加稳定和可观测可溢出的刷新缓冲区刷新线程现在使用 DuckDB 缓冲区表在内存压力高时溢出到磁盘防止大批次数据导致 OOM 崩溃。并发刷新控制FlushGate信号量限制每个同步组的并发刷新数默认4。无法获得槽位的线程将继续缓冲并在下一个周期重试。共享内存指标新增的duckpipe.metrics()函数返回管道健康状况的 JSON 快照SELECTduckpipe.metrics();-- 输出示例{tables: {table_1: {queued_changes:128,total_queued_changes:584320,flush_count:1247,flush_duration_ms:34500,avg_row_bytes:142,is_backpressured:false} },groups: {group_1: {total_queued_changes:584320,is_backpressured:false} } }一眼就能看到队列深度、刷新吞吐量、每行存储成本以及是否处于背压状态。其他改进访问控制add_table()现在会自动将 DuckLake 目标表上的 SELECT 权限授予源表所有者因此现有的访问模式无需手动执行 GRANT 语句即可继承。修复 JSONB/JSON 支持jsonb和json列现在映射到 DuckDB 的原生 JSON 类型。下一步计划我们正在继续推进JSONB 到 VARIANT 映射将 PostgreSQL 的jsonb列映射到 DuckDB 的原生 VARIANT 类型以便在列式端进行更丰富的半结构化分析。压缩和保留策略自动合并 Parquet 文件和基于时间的数据过期。更广泛的 PostgreSQL 版本支持PG 16 及更早版本。请前往 GitHub 试用github.com/relytcloud/pg_duckpipe