Flink源码阅读:Kafka Connector

发布时间:2026/6/11 4:09:45

Flink源码阅读:Kafka Connector 自定义 Source 和 Sink在介绍 Kafka Connector 之前我们先来看一下在 Flink 中是如何支持自定义 Source 和 Sink 的。我们来看一张 Flink 官方文档提供的图。这张图展示了 Connector 的基本体系结构三层架构也非常清晰。Metadata首先是最上层的 MetaDataCREATE TABLE 会更新 Catalog然后被转换为 TableAPI 的 CatalogTableCatalogTable 实例用于表示动态表Source 或 Sink 表的元信息。Planning在解析和优化程序时会将 CatalogTable 转换为 DynamicTableSource 和 DynamicTableSink分别用于查询和插入数据这两个实例的创建都需要对应的工厂类工厂类的完整路径需要放到这个配置文件中。META-INF/services/org.apache.flink.table.factories.Factory如果有需要的话我们还可以在解析过程中配置编码和解码方法。在 Source 端通过三个接口支持不同的查询能力。ScanTableSource用于消费 changelog 流扫描的数据支持 insert、updata、delete 三种类型。ScanTableSource 还支持很多其他的功能 都是通过接口提供的。具体可以看参考这个连接https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/sourcessinks/#source-abilitiesLookupTableSourceLookupTableSource 不会全量读取表的数据它在需要时会发送请求懒加载数据。目前只支持 insert-only 变更模式。VectorSearchTableSource使用一个输入向量来搜索数据并返回最相似的 Top-K 行数据。在 Sink 端通过 DynamicTableSink 来实现具体的写入逻辑这里也提供了一些用于扩展能力的接口。具体参考https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/table/sourcessinks/#sink-abilitiesRuntime逻辑解析完成后会到 Runtime 层。这里就是定义几个 Provider在 Provider 中实现和连接器具体的交互逻辑。小结当我们需要创建一个自定义的 Source 和 Sink 时就可以通过以下步骤实现。定义 Flink SQL 的 DDL需要定义相应的 Options。实现 DynamicTableSourceFactory 和 DynamicTableSinkFactory并把实现类的具体路径写到配置文件中。实现 DynamicTableSource 和 DynamicTableSink这里需要处理 SQL 层的元数据。提供 Provider将逻辑层与底层 DataStream 关联起来。编写底层算子实现 Source 和 Sink 接口。Kafka Connector 的实现带着这些知识我们一起来看一下 Kafka Connector 相关的源码。Kafka Connector 代码目前已经是一个独立的项目了。项目地址是https://github.com/apache/flink-connector-kafkaFactory我们首先找到定义的工厂类org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory以 KafkaDynamicTableFactory 为例它同时实现了 DynamicTableSourceFactory 和 DynamicTableSinkFactory 两个接口。KafkaDynamicTableFactory 包含以下几个方法。factoryIdentifier返回一个唯一标识符对应 Flink SQL 中 connectorxxx 这个配置。requiredOptions必填配置集合。optionalOptions选填配置集合。forwardOptions直接传递到 Runtime 层的配置集合。createDynamicTableSource创建 DynamicTableSource。createDynamicTableSink创建 DynamicTableSink。Source 端工厂类的 createDynamicTableSource 方法创建了 DynamicTableSource我们来看一下创建的逻辑。public DynamicTableSource createDynamicTableSource(Context context) { final TableFactoryHelper helper FactoryUtil.createTableFactoryHelper(this, context); final OptionalDecodingFormatDeserializationSchemaRowData keyDecodingFormat getKeyDecodingFormat(helper); final DecodingFormatDeserializationSchemaRowData valueDecodingFormat getValueDecodingFormat(helper); helper.validateExcept(PROPERTIES_PREFIX); final ReadableConfig tableOptions helper.getOptions(); validateTableSourceOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), context.getCatalogTable().getOptions(), valueDecodingFormat); final StartupOptions startupOptions getStartupOptions(tableOptions); final BoundedOptions boundedOptions getBoundedOptions(tableOptions); final Properties properties getKafkaProperties(context.getCatalogTable().getOptions()); // add topic-partition discovery final Duration partitionDiscoveryInterval tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY); properties.setProperty( KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(), Long.toString(partitionDiscoveryInterval.toMillis())); final DataType physicalDataType context.getPhysicalRowDataType(); final int[] keyProjection createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection createValueFormatProjection(tableOptions, physicalDataType); final String keyPrefix tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); final Integer parallelism tableOptions.getOptional(SCAN_PARALLELISM).orElse(null); return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, keyProjection, valueProjection, keyPrefix, getTopics(tableOptions), getTopicPattern(tableOptions), properties, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis, boundedOptions.boundedMode, boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString(), parallelism); }在这个方法中首先要获取到 key 和 value 的解码格式。接着是各种参数校验和获取必要的属性。最后创建 KafkaDynamicSource 实例。获取解码格式需要用到 DeserializationFormatFactory 工厂DeserializationFormatFactory 有多个实现类对应了多种格式的反序列化方法。我们来看比较常见的 Json 格式的工厂 JsonFormatFactory。public DecodingFormatDeserializationSchemaRowData createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { FactoryUtil.validateFactoryOptions(this, formatOptions); JsonFormatOptionsUtil.validateDecodingFormatOptions(formatOptions); final boolean failOnMissingField formatOptions.get(FAIL_ON_MISSING_FIELD); final boolean ignoreParseErrors formatOptions.get(IGNORE_PARSE_ERRORS); final boolean jsonParserEnabled formatOptions.get(DECODE_JSON_PARSER_ENABLED); TimestampFormat timestampOption JsonFormatOptionsUtil.getTimestampFormat(formatOptions); return new ProjectableDecodingFormatDeserializationSchemaRowData() { Override public DeserializationSchemaRowData createRuntimeDecoder( DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) { final DataType producedDataType Projection.of(projections).project(physicalDataType); final RowType rowType (RowType) producedDataType.getLogicalType(); final TypeInformationRowData rowDataTypeInfo context.createTypeInformation(producedDataType); if (jsonParserEnabled) { return new JsonParserRowDataDeserializationSchema( rowType, rowDataTypeInfo, failOnMissingField, ignoreParseErrors, timestampOption, toProjectedNames( (RowType) physicalDataType.getLogicalType(), projections)); } else { return new JsonRowDataDeserializationSchema( rowType, rowDataTypeInfo, failOnMissingField, ignoreParseErrors, timestampOption); } } Override public ChangelogMode getChangelogMode() { return ChangelogMode.insertOnly(); } Override public boolean supportsNestedProjection() { return jsonParserEnabled; } }; }在创建解码格式时最重要的是创建运行时的解码器也就是 DeserializationSchema在 JsonFormatFactory 中有 JsonParserRowDataDeserializationSchema 和 JsonRowDataDeserializationSchema 两种实现分别是用于将 JsonParser 和 JsonNode 转换成为 RowData具体的逻辑都在 createNotNullConverter 方法中。了解完解码格式后我们把视角拉回到 KafkaDynamicSource它实现了三个接口 ScanTableSource、SupportsReadingMetadata、SupportsWatermarkPushDown。分别用于消费数据读取元数据和生成水印。public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { final DeserializationSchemaRowData keyDeserialization createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix); final DeserializationSchemaRowData valueDeserialization createDeserialization(context, valueDecodingFormat, valueProjection, null); final TypeInformationRowData producedTypeInfo context.createTypeInformation(producedDataType); final KafkaSourceRowData kafkaSource createKafkaSource(keyDeserialization, valueDeserialization, producedTypeInfo); return new DataStreamScanProvider() { Override public DataStreamRowData produceDataStream( ProviderContext providerContext, StreamExecutionEnvironment execEnv) { if (watermarkStrategy null) { watermarkStrategy WatermarkStrategy.noWatermarks(); } DataStreamSourceRowData sourceStream execEnv.fromSource( kafkaSource, watermarkStrategy, KafkaSource- tableIdentifier); providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid); return sourceStream; } Override public boolean isBounded() { return kafkaSource.getBoundedness() Boundedness.BOUNDED; } Override public OptionalInteger getParallelism() { return Optional.ofNullable(parallelism); } }; }在 ScanRuntimeProvider 的逻辑中先获取到反序列化器也就是刚刚我们提到的 DeserializationSchema。然后开始创建 KafkaSource 实例它是 Source 的实现类也就是执行引擎层了这个过程会依次创建图中这些类。KafkaSource 中主要是创建 KafkaSourceReader 和 KafkaSourceEnumeratorKafkaSourceEnumerator 是负责和分片相关的逻辑包括分片分配和分片发现等。KafkaSourceReader 中主要是和 State 相关的逻辑包括触发快照和完成 Checkpoint 通知的方法。当做 Snapshot 时会记录活跃 split 的 offset同时将 split 作为状态提交。当 Checkpoint 完成时会调用KafkaSourceFetcherManager.commitOffsets提交 offset。public ListKafkaPartitionSplit snapshotState(long checkpointId) { ListKafkaPartitionSplit splits super.snapshotState(checkpointId); if (!commitOffsetsOnCheckpoint) { return splits; } if (splits.isEmpty() offsetsOfFinishedSplits.isEmpty()) { offsetsToCommit.put(checkpointId, Collections.emptyMap()); } else { MapTopicPartition, OffsetAndMetadata offsetsMap offsetsToCommit.computeIfAbsent(checkpointId, id - new HashMap()); // Put the offsets of the active splits. for (KafkaPartitionSplit split : splits) { // If the checkpoint is triggered before the partition starting offsets // is retrieved, do not commit the offsets for those partitions. if (split.getStartingOffset() 0) { offsetsMap.put( split.getTopicPartition(), new OffsetAndMetadata(split.getStartingOffset())); } } // Put offsets of all the finished splits. offsetsMap.putAll(offsetsOfFinishedSplits); } return splits; } public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug(Committing offsets for checkpoint {}, checkpointId); ... ((KafkaSourceFetcherManager) splitFetcherManager) .commitOffsets( committedPartitions, (ignored, e) - {...}); }KafkaSourceFetcherManager 负责管理 fetcher 线程提交 Offset。KafkaPartitionSplitReader 的 fetch 方法用来消费 Kafka 的数据。public RecordsWithSplitIdsConsumerRecordbyte[], byte[] fetch() throws IOException { ConsumerRecordsbyte[], byte[] consumerRecords; try { consumerRecords consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); } catch (WakeupException | IllegalStateException e) { // IllegalStateException will be thrown if the consumer is not assigned any partitions. // This happens if all assigned partitions are invalid or empty (starting offset // stopping offset). We just mark empty partitions as finished and return an empty // record container, and this consumer will be closed by SplitFetcherManager. KafkaPartitionSplitRecords recordsBySplits new KafkaPartitionSplitRecords( ConsumerRecords.empty(), kafkaSourceReaderMetrics); markEmptySplitsAsFinished(recordsBySplits); return recordsBySplits; } KafkaPartitionSplitRecords recordsBySplits new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics); ListTopicPartition finishedPartitions new ArrayList(); for (TopicPartition tp : consumer.assignment()) { long stoppingOffset getStoppingOffset(tp); long consumerPosition getConsumerPosition(tp, retrieving consumer position); // Stop fetching when the consumers position reaches the stoppingOffset. // Control messages may follow the last record; therefore, using the last records // offset as a stopping condition could result in indefinite blocking. if (consumerPosition stoppingOffset) { LOG.debug( Position of {}: {}, has reached stopping offset: {}, tp, consumerPosition, stoppingOffset); recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, consumerPosition, finishedPartitions, recordsBySplits); } } // Only track non-empty partitions record lag if it never appears before consumerRecords .partitions() .forEach( trackTp - { kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp); }); markEmptySplitsAsFinished(recordsBySplits); // Unassign the partitions that has finished. if (!finishedPartitions.isEmpty()) { finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric); unassignPartitions(finishedPartitions); } // Update numBytesIn kafkaSourceReaderMetrics.updateNumBytesInCounter(); return recordsBySplits; }至此Source 端相关的源码我们就梳理完了。接下来我们再看 Sink 端的代码。Sink 端我们从工厂类中的 createDynamicTableSink 方法开始。public DynamicTableSink createDynamicTableSink(Context context) { final TableFactoryHelper helper FactoryUtil.createTableFactoryHelper( this, autoCompleteSchemaRegistrySubject(context)); final OptionalEncodingFormatSerializationSchemaRowData keyEncodingFormat getKeyEncodingFormat(helper); final EncodingFormatSerializationSchemaRowData valueEncodingFormat getValueEncodingFormat(helper); helper.validateExcept(PROPERTIES_PREFIX); final ReadableConfig tableOptions helper.getOptions(); final DeliveryGuarantee deliveryGuarantee validateDeprecatedSemantic(tableOptions); validateTableSinkOptions(tableOptions); KafkaConnectorOptionsUtil.validateDeliveryGuarantee(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getPrimaryKeyIndexes(), context.getCatalogTable().getOptions(), valueEncodingFormat); final DataType physicalDataType context.getPhysicalRowDataType(); final int[] keyProjection createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection createValueFormatProjection(tableOptions, physicalDataType); final String keyPrefix tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); final Integer parallelism tableOptions.getOptional(SINK_PARALLELISM).orElse(null); return createKafkaTableSink( physicalDataType, keyEncodingFormat.orElse(null), valueEncodingFormat, keyProjection, valueProjection, keyPrefix, getTopics(tableOptions), getTopicPattern(tableOptions), getKafkaProperties(context.getCatalogTable().getOptions()), getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), deliveryGuarantee, parallelism, tableOptions.get(TRANSACTIONAL_ID_PREFIX), tableOptions.get(TRANSACTION_NAMING_STRATEGY)); }和 Source 的流程很相似这里首先是获取 key 和 value 的编码格式然后做了很多校验最后是创建 KafkaDynamicSink 实例。获取编码格式用到的工厂类是 SerializationFormatFactory我们前面介绍的 JsonFormatFactory 也实现了 SerializationFormatFactory因此它既提供了解码格式又提供了编码格式。编码格式用到的编码器是 JsonRowDataSerializationSchema通过 RowDataToJsonConverters 将 RowData 转换成 JsonNode。在 KafkaDynamicSink 的 getSinkRuntimeProvider 方法中主要就是创建 KafkaSink 实例。KafkaSink 类实现了 TwoPhaseCommittingStatefulSink 接口即支持两阶段提交。它创建了 KafkaWrter 和 KafkaCommiter。创建 KafkaWriter 时如果配置的是 ExactlyOnce 模式则会创建出 ExactlyOnceKafkaWriter否则创建 KafkaWriter。Writer 真正实现两阶段提交的是 ExactlyOnceKafkaWriter。它在启动时会调用producer.beginTransaction开启一个事务。数据写入时会调用KafkaWriter.write方法此操作会被标记为事务内的操作。当 Sink 收到 Barrier 时会先调用 flush 方法将缓冲区的数据都发送到 Kafka Broker然后调用 prepareCommit 方法预提交。预提交方法中记录 epoch 和 transactionalId 返回给框架层。public CollectionKafkaCommittable prepareCommit() { // only return a KafkaCommittable if the current transaction has been written some data if (currentProducer.hasRecordsInTransaction()) { KafkaCommittable committable KafkaCommittable.of(currentProducer); LOG.debug(Prepare {}., committable); currentProducer.precommitTransaction(); return Collections.singletonList(committable); } // otherwise, we recycle the producer (the pool will reset the transaction state) producerPool.recycle(currentProducer); return Collections.emptyList(); }状态保存时会将预提交的 transactionalId 存到状态中。public ListKafkaWriterState snapshotState(long checkpointId) throws IOException { // recycle committed producers TransactionFinished finishedTransaction; while ((finishedTransaction backchannel.poll()) ! null) { producerPool.recycleByTransactionId( finishedTransaction.getTransactionId(), finishedTransaction.isSuccess()); } // persist the ongoing transactions into the state; these will not be aborted on restart CollectionCheckpointTransaction ongoingTransactions producerPool.getOngoingTransactions(); currentProducer startTransaction(checkpointId 1); return createSnapshots(ongoingTransactions); } private ListKafkaWriterState createSnapshots( CollectionCheckpointTransaction ongoingTransactions) { ListKafkaWriterState states new ArrayList(); int[] subtaskIds this.ownedSubtaskIds; for (int index 0; index subtaskIds.length; index) { int ownedSubtask subtaskIds[index]; states.add( new KafkaWriterState( transactionalIdPrefix, ownedSubtask, totalNumberOfOwnedSubtasks, transactionNamingStrategy.getOwnership(), // new transactions are only created with the first owned subtask id index 0 ? ongoingTransactions : List.of())); } LOG.debug(Snapshotting state {}, states); return states; }当 Checkpoint 完成时会调用KafkaCommitter.commit方法。在 commit 方法中会调用producer.commitTransaction正式提交事务。FlinkKafkaInternalProducer 是 Flink 内部封装的与 Kafka 生产者的交互类所有与 Kafka 生产者的交互都通过它执行。关于 Kafka Connector 的 Sink 端的源码我们就梳理到这里。总结最后还是总结一下。本文我们先了解了 Flink 中自定义 Source 和 Sink 的流程。按照这个流程我们梳理了 Kafka Connector 的源码。在 Source 端Flink Kafka 封装了对消费者 Offset 的提交逻辑。在 Sink 端结合了 Kafka 提供的事务支持实现了两阶段提交的逻辑。

相关新闻