Kfaka2、客户端工作机制

发布时间:2026/5/19 19:23:46

Kfaka2、客户端工作机制 Kafka的设计精髓在于网络不稳定服务也随时会崩溃的复杂场景下如何保证消息的高并发、高吞吐。但是要理解那些复杂的问题需要建立在这个基础模型基础上的。1、消费者分组消费机制在Consumer中都需要指定一个GROUP_ID_CONFIG属性这表示当前Consumer所属的消费者组。public static final String GROUP_ID_CONFIG group.id; public static final String GROUP_ID_DOC A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using codesubscribe(topic) /code or the Kafka-based offset management strategy.;对于Consumer如果需要在subcribe(订阅)时使用组管理功能以及Kafka提供的offset管理策略那就必须要配置GROUP_ID_CONFIG属性kafka分组消费机制生产者往topic发送消息时会尽量均匀的发送到topic的各个partition中。这个消息会被推送到所有订阅了这个topic的消费者中。每个消费者组中只会推送一次也就是同一个消费者组中的消费者实例只会消费一份消息副本而不同的消费者组会重复消费同一份消息副本。PARTITION分区CURRENT-OFFSET:分区最新消息偏移量LOG-END-OFFSET当前消费组已经消费结束消息的偏移量LAG为消费的消息offset偏移量需要消费者处理完消息后主动向kafka的broker提交。提交完成后broker会更新消费进度记录这个消息已经被消费者组消费。如果消费者没有向broker提交offsetbroker就会认为这条消息还没有被消费者处理会重新往消费者组中推送。不过这次推送会尽量推送给同一个消费者组的其他消费者实例。2、生产者拦截器机制生产者拦截机制允许客户端在生产者在消息发送到Kafka集群之前对消息进行拦截或修改消息内容。如上篇文章Kakfa1中的MyInterceptor类实现。只需要在MyProducer设置kafka相关属性时指定拦截器//多个拦截器类用逗号隔开 properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, com.liyy.basic.MyInterceptor);参数定义说明public static final String INTERCEPTOR_CLASSES_CONFIG interceptor.classes; public static final String INTERCEPTOR_CLASSES_DOC A list of classes to use as interceptors. Implementing the codeorg.apache.kafka.clients.producer.ProducerInterceptor/code interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.;3、消息序列化机制product指定的两个属性public static final String KEY_SERIALIZER_CLASS_CONFIG key.serializer; public static final String VALUE_SERIALIZER_CLASS_CONFIG value.serializer; //参数文档 public static final String KEY_SERIALIZER_CLASS_DOC Serializer class for key that implements the codeorg.apache.kafka.common.serialization.Serializer/code interface.; public static final String VALUE_SERIALIZER_CLASS_DOC Serializer class for value that implements the codeorg.apache.kafka.common.serialization.Serializer/code interface.;可以将生产者发送的消息中的key和value序列华为二进制数据。在kafka消息定义中key和value的作用如下key用来进行分区的可选项通过key来判断将消息分发到那个partition。如果没有填写key那么kafka通过轮询Round-robin的方式来选择partition。如果填写了keykafka会通过声明的Serializer序列化接口将key转化为byte接数组然后对key进行hash来选择对应的partition这样接可以保证key相同的消息被分配到同一个partition中。value传输的业务数据。kafka将value通过定义的Serializer序列化接口转化为byte数组这样能够比较好的在网络上传输value信息以及将value信息落到操作系统的文件中。生产者对消息进行了序列化那么消费者在拉取消息时就需要对消息进行反序列化。在Consumer中也有反序列化的两个配置public static final String KEY_DESERIALIZER_CLASS_CONFIG key.deserializer; public static final String VALUE_DESERIALIZER_CLASS_CONFIG value.deserializer; //参数文档 public static final String KEY_DESERIALIZER_CLASS_DOC Deserializer class for key that implements the codeorg.apache.kafka.common.serialization.Deserializer/code interface.; public static final String VALUE_DESERIALIZER_CLASS_DOC Deserializer class for value that implements the codeorg.apache.kafka.common.serialization.Deserializer/code interface.;在kafka中对于一些常用的基础数据类型已经提供了对应的实现类。但如果需要一些自定义的消息格式如自定义的POJO就需要指定具体的实现类了。序列化机制是在高并发场景下非常重要的一个优化机制。高效的序列化机制能够极大的提升分布式系统的网络传输能力和数据落盘能力。4、消息分区路由机制4.1kafka如何通过key分配partitionpublic static final String PARTITIONER_CLASS_CONFIG partitioner.class; private static final String PARTITIONER_CLASS_DOC A class to use to determine which partition to be send to when produce the records. Available options are: ul liIf not set, the default partitioning logic is used. This strategy will try sticking to a partition until at least BATCH_SIZE_CONFIG bytes is produced to the partition. It works with the strategy: ul liIf no partition is specified but a key is present, choose a partition based on a hash of the key/li liIf no partition or key is present, choose the sticky partition that changes when at least BATCH_SIZE_CONFIG bytes are produced to the partition./li /ul /li licodeorg.apache.kafka.clients.producer.RoundRobinPartitioner/code: This partitioning strategy is that each record in a series of consecutive records will be sent to a different partition(no matter if the key is provided or not), until we run out of partitions and start over again. Note: Theres a known issue that will cause uneven distribution when new batch is created. Please check KAFKA-9965 for more detail. /li /ul pImplementing the codeorg.apache.kafka.clients.producer.Partitioner/code interface allows you to plug in a custom partitioner.;Kafka是通过一个Partitioner接口的具体实现来决定一个消息如何根据Key分配到对应的Partition上的。甚至可以自己实现一个简单的分配策略。在3.2.0版本Kafka提供了三种默认的Partitioner实现类RoundRobinPartitionerDefaultPartitioner和UniformStickyPartitioner。目前后面两种实现已经标记为过期被替换成了默认的分区实现机制。自己也可以自定义一个Partitioner实现类定制分区逻辑。核心就是要实现partition方法。根据获取到的partition信息来选择partition。如获取key的hash值对partition个数取模topic的所有partition信息都可以在cluster中获取。//获取Partition信息 ListPartitionInfo partitions cluster.partitionsForTopic(topic);然后在Consumer中可以指定一个PARTITION_ASSIGNMENT_STRATEGY分区分配策略决定如何在多个Consumer实例和多个Partitioner之间建立关联关系。public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG partition.assignment.strategy; private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC A list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Available options are: ul licodeorg.apache.kafka.clients.consumer.RangeAssignor/code: Assigns partitions on a per-topic basis./li licodeorg.apache.kafka.clients.consumer.RoundRobinAssignor/code: Assigns partitions to consumers in a round-robin fashion./li licodeorg.apache.kafka.clients.consumer.StickyAssignor/code: Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible./li licodeorg.apache.kafka.clients.consumer.CooperativeStickyAssignor/code: Follows the same StickyAssignor logic, but allows for cooperative rebalancing./li /ul pThe default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list./p pImplementing the codeorg.apache.kafka.clients.consumer.ConsumerPartitionAssignor/code interface allows you to plug in a custom assignment strategy./p;kafka内置了一些分区实现方式在通常情况下也都是最有的选择。当然自己也可以自定义分区策略。kafka默认提供了三种消费者分区分配策略1range/范围策略。例如kafka有10个partition一个消费者组下有3个Consumer。range策略将0-3分给Consumer14-6分给Consumer27-9分给consumer32round-robin/轮询策略3sticky/粘性策略有连个分配原则刚开始时会尽量保证均匀分配。比如使用range策略或round-robin策略这个选择时随机的分区的分配会尽量和执之前的分配保持一致。但是当Consumer3服务宕机时就会按照sticky策略保证C1和C2原本分配到的partition不变将C3分配到的partition尽量均匀的分配给C1和C2。这样就能够很好的保持各个Consumer数据的稳定性。官方默认的生产者端分区器以及消费者端的RangeSticky分配策略在大部分场景下都是非常高效的算法。5、生产者消息缓存机制为了避免高并发请求对服务器造成的压力过大kafka的Producer往服务端发消息时并不是一条一条的发而是增加了一个高速缓存区将消息最终到缓存区后当达到某一个阈值后再批量发送到服务端。这种缓存机制是解决高并发场景的一种常用手段。生产者消息缓存机制涉及到KafkaProducer中的两个关键组件accumulator和sender发送应答机制生产消息幂等性

相关新闻