【Kafka源码解读和使用指南】第19篇:Sender线程源码解析——Kafka生产者的“快递员“

发布时间:2026/6/8 9:51:52

【Kafka源码解读和使用指南】第19篇:Sender线程源码解析——Kafka生产者的“快递员“ 上一篇【第18篇】BufferPool源码解析——Kafka生产者的内存管家下一篇【第20篇】KSelector源码解析——Kafka网络通信的基石摘要前面几篇文章讲的都是攒消息——主线程send()之后消息进入了RecordAccumulator在MemoryRecords中安静等待。现在轮到Sender线程登场了它是KafkaProducer中真正执行网络I/O的快递员从RecordAccumulator中取消息、组装成ProduceRequest、通过NetworkClient发送出去、等待响应、触发回调。本文将深入源码剖析Sender线程的运行循环、RecordBatch到ProduceRequest的转换过程、InFlightRequests的飞行管理机制以及响应处理与用户回调的触发逻辑。读完这篇一条消息从send()到真正发送的全链路就彻底打通了。一、Sender线程的定位与运行循环1.1 一句话定位Sender KafkaProducer中负责将RecordAccumulator中的Batch转换为网络请求并发送的独立线程。// Sender实现了Runnable运行在独立线程中publicclassSenderimplementsRunnable{privatefinalKafkaClientclient;// NetworkClient网络通信privatefinalRecordAccumulatoraccumulator;// 消息缓冲区privatefinalMetadatametadata;// 集群元数据privatefinalintmaxRequestSize;// 单次请求最大大小Overridepublicvoidrun(){// Sender线程的主循环while(running){try{run(time.milliseconds());// 核心方法}catch(Exceptione){log.error(Uncaught error in Sender thread,e);}}}}1.2 run()方法的完整流程图【Sender.run() 完整流程】 ┌────────────────────────────────────────────────────────────────┐ │ Sender.run(long now) │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤①: Cluster cluster metadata.fetch() │ │ 获取最新的Kafka集群元数据快照 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤②: ReadyCheckResult result accumulator.ready(cluster, now)│ │ 根据RecordAccumulator状态筛选可以发送消息的Node节点 │ │ 条件: Batch满了 / 超时了 / BufferPool耗尽 / flush中 / 关闭中 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤③: 如果 unknownLeadersExist → metadata.requestUpdate() │ │ 有分区找不到Leader副本 → 标记需要更新集群元数据 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤④: 遍历readyNodes调用 client.ready(node, now) 检查连接 │ │ NetworkClient检查: 连接状态/InFlightRequests是否可继续发送 │ │ 不满足条件的节点从readyNodes中移除 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤⑤: MapNodeId, ListRecordBatch │ │ accumulator.drain(cluster, readyNodes, maxSize, now) │ │ 从RecordAccumulator中排水取出待发送的RecordBatch │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤⑥: accumulator.abortExpiredBatches(now) │ │ 处理超时批次可能触达delivery.timeout.ms │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤⑦: ListClientRequest createProduceRequests(collated) │ │ 将每个Node对应的RecordBatch列表转换为一个ProduceRequest │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤⑧: 遍历ClientRequest调用 client.send(request, now) │ │ 将请求放入KafkaChannel.send字段 InFlightRequests队列 │ └────────────────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────────────────────────────────────────────────────┐ │ 步骤⑨: ListClientResponse client.poll(timeout, now) │ │ 执行实际网络I/O: 发送请求 接收响应 处理超时/断连 │ │ 遍历responses触发RequestCompletionHandler回调 │ └────────────────────────────────────────────────────────────────┘二、ProduceRequest的组装——从RecordBatch到网络请求2.1 核心转换逻辑// 步骤⑦将每个Node的RecordBatch列表组装成ClientRequestprivateListClientRequestcreateProduceRequests(MapInteger,ListRecordBatchcollated,longnow){ListClientRequestrequestsnewArrayList(collated.size());for(Map.EntryInteger,ListRecordBatchentry:collated.entrySet()){// 每个Node只有一个ProduceRequestrequests.add(produceRequest(now,entry.getKey(),acks,requestTimeout,entry.getValue()));}returnrequests;}privateClientRequestproduceRequest(longnow,intdestination,shortacks,inttimeout,ListRecordBatchbatches){// 把RecordBatch列表按TopicPartition重新分组MapTopicPartition,ByteBufferproduceRecordsByPartitionnewHashMap();MapTopicPartition,RecordBatchrecordsByPartitionnewHashMap();for(RecordBatchbatch:batches){TopicPartitiontpbatch.topicPartition;produceRecordsByPartition.put(tp,batch.records.buffer());// 消息数据recordsByPartition.put(tp,batch);// 元数据用于回调}// 创建ProduceRequest对象符合Kafka协议格式ProduceRequestrequestnewProduceRequest(acks,timeout,produceRecordsByPartition);// 序列化为RequestSend真正的网络传输格式RequestSendsendnewRequestSend(Integer.toString(destination),this.client.nextRequestHeader(ApiKeys.PRODUCE),request.toStruct());// 创建响应回调RequestCompletionHandlercallbacknewRequestCompletionHandler(){publicvoidonComplete(ClientResponseresponse){// 响应回来后调用handleProduceResponse处理handleProduceResponse(response,recordsByPartition,time.milliseconds());}};// 封装为ClientRequestreturnnewClientRequest(now,acks!0,send,callback);}2.2 一个Node一个请求的原则【为什么每个Node只发送一个ProduceRequest】 Node#1 上的分区和待发送Batch: ┌────────────────────────────────────────┐ │ P0: [Batch-A] │ │ P1: [Batch-B, Batch-C] │ │ P3: [Batch-D] │ │ │ │ ──► 合并为一个ProduceRequest ──► │ │ { │ │ orders-0: Batch-A的数据, │ │ orders-1: Batch-BBatch-C的数据,│ │ orders-3: Batch-D的数据 │ │ } │ │ │ │ → 然后一个ClientRequest发走 │ │ → Broker收到后自己按分区拆解处理 │ └────────────────────────────────────────┘ 好处减少网络往返。如果4个Batch分4个请求就是4次RTT 合并成1个请求只需1次RTT三、InFlightRequests——飞行中的请求管理3.1 数据结构finalclassInFlightRequests{// 每个Node维护一个已发送但未收到响应的请求队列privatefinalMapString,DequeClientRequestrequestsnewHashMap();privatefinalintmaxInFlightRequestsPerConnection;// 每个连接最多积压的请求数// 限制条件只有当队首请求已经发送完成且队列长度未超限时才能发送新请求publicbooleancanSendMore(Stringnode){DequeClientRequestqueuerequests.get(node);returnqueuenull||queue.isEmpty()||(queue.peekFirst().request().completed()// 队首已发送完成queue.size()this.maxInFlightRequestsPerConnection);}}3.2 为什么要有这个限制【InFlightRequests 的流量控制】 没有限制的情况灾难: Node#1 的KafkaChannel ┌──────────────────────────────────────────┐ │ send字段: Req#5 正在发送... │ │ │ │ InFlight队列: │ │ [Req#1] [Req#2] [Req#3] [Req#4] [Req#5] │ ← 已经5个未收到响应 │ │ │ 如果还能继续发 Req#6: │ │ → send字段要被覆盖上一个还没发完 │ │ → 连接积压过多请求 → 超时风暴 │ └──────────────────────────────────────────┘ 有限制的情况安全: 默认 maxInFlightRequestsPerConnection 5 队首如果还没发完 → canSendMore() false → 不能发新请求 队首发完了 队列长度 5 → 可以发新请求3.3 InFlightRequests的生命周期【InFlightRequests 的请求生命周期】 发送阶段: ┌──────────────┐ ┌──────────────────┐ ┌──────────────────┐ │ Sender构造 │ ──► │ inFlightRequests │ ──► │ KSelector.send() │ │ ClientRequest │ │ .add(request) │ │ 设置到Channel │ └──────────────┘ └──────────────────┘ └──────────────────┘ 响应阶段: ┌──────────────────┐ ┌──────────────────┐ │ handleCompleted │ │ inFlightRequests │ │ Receives收到响应 │ ──► │ .completeNext() │ ──► 从队列中移除 └──────────────────┘ └──────────────────┘ 超时/断连: ┌──────────────────┐ ┌──────────────────┐ │ handleTimeouts/ │ │ inFlightRequests │ │ Disconnections │ ──► │ .clearAll(node) │ ──► 清空该节点全部请求 └──────────────────┘ └──────────────────┘四、响应处理与回调触发4.1 handleProduceResponse——正常响应处理privatevoidhandleProduceResponse(ClientResponseresponse,MapTopicPartition,RecordBatchbatches,longnow){intcorrelationIdresponse.request().request().header().correlationId();if(response.wasDisconnected()){// 情况1: 连接断开网络故障log.debug(Disconnected from node {}. Will retry.,response.request().request().destination());for(RecordBatchbatch:batches.values())completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,Record.NO_TIMESTAMP,correlationId,now);}else{// 情况2: 正常收到响应if(response.hasResponse()){// 解析ProduceResponseProduceResponseproduceResponsenewProduceResponse(response.responseBody());for(Map.EntryTopicPartition,ProduceResponse.PartitionResponseentry:produceResponse.responses().entrySet()){TopicPartitiontpentry.getKey();ProduceResponse.PartitionResponsepartRespentry.getValue();ErrorserrorErrors.forCode(partResp.errorCode);RecordBatchbatchbatches.get(tp);completeBatch(batch,error,partResp.baseOffset,partResp.timestamp,correlationId,now);}}else{// acks0 的情况没有响应for(RecordBatchbatch:batches.values())completeBatch(batch,Errors.NONE,-1L,Record.NO_TIMESTAMP,correlationId,now);}}}4.2 completeBatch——重试判决与回调触发privatevoidcompleteBatch(RecordBatchbatch,Errorserror,longbaseOffset,longtimestamp,longcorrelationId,longnow){if(error!Errors.NONEcanRetry(batch,error)){// 可重试重新放回RecordAccumulator等待重发this.accumulator.reenqueue(batch,now);// 记录重试次数下次尝试时判断是否超限}else{// 不可重试触发用户回调RuntimeExceptionexception;if(errorErrors.TOPIC_AUTHORIZATION_FAILED)exceptionnewTopicAuthorizationException(batch.topicPartition.topic());elseif(errorErrors.NONE)exceptionnull;elseexceptionerror.exception();// 调用RecordBatch.done() → 遍历所有Thunk → 触发每个消息的Callbackbatch.done(baseOffset,timestamp,exception);// 释放ByteBufferthis.accumulator.deallocate(batch);}// 元数据相关的错误 → 触发Metadata更新if(error.exception()instanceofInvalidMetadataException)metadata.requestUpdate();}// 判断是否可重试privatebooleancanRetry(RecordBatchbatch,Errorserror){// 重试条件// 1. 错误类型允许重试如网络异常、LeaderNotAvailable等// 2. 重试次数未超过 retries 配置默认Integer.MAX_VALUE// 3. 没有超时returnerror.exception()instanceofRetriableExceptionbatch.attempts()this.retries(System.currentTimeMillis()-batch.lastAttemptMs())deliveryTimeoutMs;}本篇文章中涉及的异常处理总结异常情况Sender处理消息最终结果网络断开reenqueue重试重试成功→正常超出retries→回调异常LeaderNotAvailablerequestUpdate元数据 重试元数据更新后重新发送NotEnoughReplicas重试等待ISR恢复重试成功→正常TopicAuthorizationFailed不重试直接失败回调onCompletion(null, exception)消息超时(delivery.timeout.ms)abortExpiredBatches回调onCompletion(null, TimeoutException)acks0无响应直接标记成功回调onCompletion(metadata, null)本篇小结Sender线程是KafkaProducer的发动机所有消息最终都经过它发送出去运行循环ready()筛选节点 → drain()取消息 → createProduceRequests()组装请求 → send()提交 → poll()执行I/O → handleProduceResponse()处理响应。这个循环不间断执行保证消息源源不断地发送一个Node一个请求把发往同一个Broker上不同分区的消息合并为一个ProduceRequest减少网络往返InFlightRequests通过限制单连接最大积压请求数默认5实现流量控制和背压防止某个慢节点被压垮重试策略可重试的错误网络异常、Leader切换自动将Batch放回RecordAccumulator重发不可重试的权限、超时直接触发用户回调Sender依赖的核心网络组件是NetworkClient和KSelector。下一篇我们就看看KSelector是怎么用Java NIO实现高性能网络通信的。上一篇【第18篇】BufferPool源码解析——Kafka生产者的内存管家下一篇【第20篇】KSelector源码解析——Kafka网络通信的基石

相关新闻