消息队列篇

发布时间:2026/7/1 10:58:56

消息队列篇 一. kafka篇优点是具有高性能、高吞吐量分布式可扩展消息队列。主要用于日志收集流式处理解耦。缺点1. 运维成本比较高 2. 默认是pull类型的当消费者消费速率低于生产速率会出现消息积压。 3. 单个分区有序跨分区不能保证有序1. kafak默认单个分区顺序消费如何保证全局有序1. 将整个Topic划分成一个分区2. 应用层排序策略1. 每条消息加上业务时间戳或者业务序列号2. 消费者拉取消息后先按时间戳进行排序再消费3. 采用幂等性保证消息消费的顺序性3. 按业务维度设置Key分区局部有序局部业务有序通过设置key让相同维度消息划分到该区域。2. 主要的概念producer: 消息的生产者向kafka发送消息。consumer: 消息的消费者从kafka读取消息。topic消息分类和管理的基本单位生产者和消费者的桥梁。partition是topic的物理分片单位保证消息的并发处理能力和顺序消费。broker: kafka的服务器节点存储消息。offset: 消息的唯一标识符consumer group多个消费者的集合同一个消费者组中的多个消费者可以协同消费一个topic信息。同一个分区只能被同一个消费者组中的一个消费者进行消费。保证不重复消费不同消费者组中的不同消费者可以消费同一个分区的消息。消费者组的作用1. 实现消息的并行消费 2. 负载均衡 3. 容错能力 4. 消息只被消费一次kafka如何保证消息不被重复的进行消费生产者端考虑1.启用幂等性发送通过设置enable.idempotencetrue生产者会对发送的消息设置唯一性的标识Broker端在写入消息时候会对标识进行判断从而消息保证不会重复的写入。2.事务的支持对于跨分区的原子性写入生产者端开启事务生产者端发送完一批消息之后提交事务。保证这批消息要么全部写入成功要么全部失败避免部分重复。3.通过设置唯一性的标识。4. 开启ack机制保证所有副本都同步完成以后才进行消费。消费者端考虑1. 通过手动提交offset而非自动的提交确保消息处理完成以后再进行提交。2. 幂等性处理消费者处理消息时候先检查该消费是否已经进行消费如果没有消费才进行消费。3. 如何提高kafka消费速率提升消费速率的重点是增加分区 提升单个组内消费者数量。下面说法不正确多个消费者组消费同一个 Topic属于广播模式每个组都会消费所有消息一次并不会提高消费速率反而可能增加系统负载4. kafka如何保证消息不丢失生产者端考虑1. 生产者端开启ACK机制ACKall。2. 生产者端开启重试策略消费者端考虑1. 设置手动提交offset保证消费成功以后再进行提交。enable.auto.commitfalse2. 设置异常处理机制保证消息消费失败之后不会跳过而是会重试或者日志记录。broker端考虑1. 副本机制Topic默认有多个副本一个leader其余的都是Follower。同步复制保障高可用。2. ISR机制: kafka仅在所有的副本都写入成功以后才确认写入成功。3. 数据持久化机制5. kafka为啥工作的这么快1. 磁盘顺序读写2. 零拷贝数据从磁盘到网卡直接传输不经过用户空间避免了用户态和内核态之间的多次拷贝和上下文切换。kafka的ACK机制1. ACK0生产者端向broker端发送消息不需要broker端确认就认为消息发送成功。2. ACK1生产者端向broker端发送消息需要等待leader节点写入成功以后才认为消息发送成功。3. ACKall生产者端向broker端发送消费需要等待leader副本和follower副本都成功写入以后才认为消息发送成功。broker端发送消息写入成功的消息给生产者端生产者端认为消息写入成功。ISR(In-Sync Replicas)同步副本集合的缩写。ISR指的是与leader节点保持完全同步的副本集合。只有ISR内的follower节点才参与数据的同步和容灾切换。6. 如何避免kafka的消息积压问题首先kafka的消息积压问题是因为消费者的速率低于生产者的速率导致的。在进行问题排查时候可以从三个层面考虑。是否生产者写入的速率过快是否是broker端的分区数过少是否是消费者端的消费者数量过少。解决方法增加Topic的分区数1. 增加Topic的分区数让多个消费者进行并发的消费。提高消费者的能力1. 增加消费者组内的消费者的数量2. 采用线程池异步消费3. 批量进行拉取批量消费而非单条拉取单条消费。kafka哪些参数调优1. 生产者端 ACK all避免消息丢失。开启重试机制幂等性机制保证消息不会丢失。2. broker端一个Topic分多个分区一个分区设置多个副本。避免消息积压避免消息丢失。3. 消费者端手动提交偏移量设置幂等性效验。防止消息丢失以及消息重复消费。批量进行拉取批量进行消费解决消息积压问题。7. kafka的使用场景1. 日志收集系统ELK2. 实时数据分析spark,flink3. 用户行为追踪埋点4.消息解耦与异步通信微服务架构8. kafkaZK进行部署方式1使用 ZooKeeper传统方式方式2KRaft 模式新方式不需要 ZooKeeperkafkakafka_2.12-3.9.1 ZooKeeper3.8.4| 元数据类型 | 说明 | 存储位置传统 KafkaZooKeeper || --------------------- | ------------------------------------- | ------------------------ || **Broker 列表** | 所有 Kafka 节点的信息id、host、port | ZooKeeper || **Topic 信息** | 包括 Topic 名称、分区数量、副本数量 | ZooKeeper || **Partition 信息** | 每个分区对应的 Leader、Follower、ISR 列表 | ZooKeeper || **Controller Broker** | 当前集群的 Controller 节点负责选举 Leader、分区变更 | ZooKeeper || **Consumer Group 信息** | 每个消费组的 offset、成员信息谁在消费哪个分区 | ZooKeeper || **ACL 配置** | 访问控制和 Topic 配置 | ZooKeeper |所以在 Kafka 2.x ZooKeeper 架构下几乎所有集群级别的控制信息都在 ZooKeeper 里而实际消息数据生产的消息是存在 每个 Broker 的磁盘日志文件里而不是 ZooKeeper9. Kafka 和 ZooKeeper 的关系Kafka broker 启动时会从 ZooKeeper 获取元数据Broker 列表、Topic 信息等Kafka broker 会向 ZooKeeper 注册自己Kafka controller 通过 ZooKeeper 选举产生消费组 offset 在旧版 Kafka2.x是保存在 ZooKeeper 的现在大多数版本 offset 可以保存在 Kafka 内部 Topic__consumer_offsets10. 消费者端的消费原理长轮询机制消费者会不断向 Kafka broker 发送 poll 请求如果没有新消息会阻塞等待默认有超时时间一旦有消息到达立即被拉取并触发 onMessage() 方法自动消费不需要手动调用任何方法Spring Kafka 框架会自动管理消费者的生命周期消息到达后自动反序列化并调用你的处理方法并发消费可以通过配置增加并发消费者数量例如设置 concurrency 3 会启动 3 个消费者线程同时消费KafkaListener( topics SECKILL_ORDER_TOPIC, groupId seckill-order-service, concurrency 3 ) public void onMessage(ConsumerRecordString, String record) { String key record.key(); String value record.value(); log.info(收到秒杀订单消息, topic{}, partition{}, offset{}, key{}, value{}, record.topic(), record.partition(), record.offset(), key, value); try { // 1. 反序列化消息体 SeckillOrderMessage message objectMapper.readValue(value, SeckillOrderMessage.class); // 2. 调用领域服务创建订单内部已包含幂等控制 SeckillOrder order orderService.createSeckillOrder(message); log.info(处理秒杀订单消息成功, orderId{}, userId{}, productId{}, order.getId(), order.getUserId(), order.getProductId()); } catch (Exception ex) { // 这里简单打印日志生产环境中应结合重试机制与死信队列处理异常消息。 log.error(处理秒杀订单消息失败, key{}, value{}, key, value, ex); // 可以根据需要抛出异常交给 KafkaListener 容错机制处理这里选择吞掉避免阻塞后续消息。 } }工作流程:UserServiceApplication.start()↓Spring 容器初始化↓扫描到 KafkaListener↓创建 Kafka 监听容器↓启动后台线程开始 poll 消息↓【等待消息...】↓生产者发送消息到 seckill-order topic↓Kafka broker 收到消息↓消费者 poll 到消息↓触发 onMessage() 方法 ← 这里处理你的业务逻辑11. Topic和消费者组再理解同一个消费者组: 一条消息-只会被一个消费者消费目的实现负载均衡不同消费者组: 一条消息-每个消费者组都能消费Kafka 的 Consumer Group 主要解决两个问题1️.消费扩展并行处理2️.业务解耦不同系统消费同一数据一个 Consumer Group 一个业务逻辑 / 一个系统为什么不用一个 Consumer Group 处理所有业务1️. 数据只能消费一次订单系统消费了库存系统就消费不到2. 业务耦合严重订单 库存 风控 日志生产环境的数据设计: Topic 数据流 Consumer Group 业务系统 Consumer 并发处理能力典型的业务场景:同一个 Topicorder-topic不同系统消费订单系统 - 处理订单库存系统 - 扣库存日志系统 - 记录日志风控系统 - 风控检测他们使用不同 Consumer Grouporder-group stock-group log-group risk-group每个系统都会消费同一条消息。在Kafka中消息是存储在Topic的Partition中的每个消费者组都会维护自己的offset。同一个消费者组内一条消息只会被其中一个消费者消费用于实现负载均衡。但不同消费者组之间互不影响因此同一条消息可以被多个消费者组分别消费。12. Topic的Partition数量的设定?PartitionConsumer 但是不能无限增加Kafka官方建议单个Broker最好 4000 Partition更常见生产环境1000 ~ 2000 Partition / Broker每个Partition会有:索引文件,日志文件,网络连接,缓存Kafka分区并不是越多越好。增加分区可以提升并发消费能力因为Kafka的并发度主要由Partition数量决定。但是不能无限的增加因为分区过多会增加Broker内存压力原数据管理的开销文件句柄数量并且会导致Consumer Group Rebalance时间变长。因此在生产环境中需要根据吞吐量和消费者数量合理规划分区数量。Partition和Consumer关系的设定?原因?官方推荐Partition一般是Consumer的2-3倍。原因:1. 方便扩容Partition 12 Consumer 6未来方便把消费者扩容到102. 容错能力更强Partition Consumer 有一个 Consumer 挂掉Rebalance分区分布会更均衡。3️.提高负载均衡Partition 10 Consumer 3C1 → P0 P1 P2 P3C2 → P4 P5 P6C3 → P7 P8 P9Kafka会自动做均衡分配。面试可以这样回答Partition 和 Consumer 并不一定需要 1:1。虽然 1:1 时并发利用率最高但在生产环境通常会让Partition 数量大于 Consumer 数量。这样可以支持未来扩容、提高负载均衡能力并避免消费者宕机带来的压力集中问题。因此一般会设计Partition数量为Consumer 的 2~3 倍。13. kafka如何解决消息积压的问题生产环境需要先判断是否是消息积压问题可以通过脚本或者监控工具:Prometheus,Grafana。监控工具如果出现异常就会告警Lag报警例如Lag 10000。主要说一下通过脚本kafka-consumer-groups.sh \--bootstrap-server localhost:9092 \--describe \--group order-group| TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG || ----------- | --------- | -------------- | -------------- | --- || order-topic | 0 | 1200 | 1300 | 100 |当前消费位置 1200最新消息位置 1300Lag 100解决方法:1. 增加消费者数量 (前提要小于partion数量) 2. 增加分区数量(提高吞吐量)3. 修改消费者的逻辑(1. 批量拉取消息进行消费 2. 异步处理消息拉取线程只负责拉取消息消息处理线程处理任务)真实场景:Producer5000 msg/s Consumer 3000 msg/s每秒就会产生2000 Lag如果持续 10 分钟Lag 2000 × 600 1,200,000就会出现 严重消息积压。14. 如何保证kafka顺序消费?单机环境topic只划分一个分区最简单。缺点吞吐量低。分布式环境 :1. 局部有序性: 根据业务key划分相同业务划分到同一个分区。订单 orderId 用户 userId 账户 accountId2. 全局有序性:多个Partition消费-收集消息-根据时间戳或序列号排序消费者先缓存 再排序缺点复杂度高 延迟高 内存消耗大二. rabbitmq篇优点: 高可用性低延迟支持多种路由协议MQTTAMQP轻量级的消息队列。缺点1. 吞吐量不高 2. 消息积压的时候性能明显的下降 3. 自身不保证顺序性单机吞吐量几万QPS使用场景1. 微服务异步解耦 2. 实时订单的处理 3. 短信消息通知三. RocketMQ篇优点高吞吐量低延迟高可用性。单机吞吐量可以达到10万QPS。原生支持全局顺序和分区顺序。缺点1. 社区活跃度不如 Kafka 和 RabbitMQ2. 生态不如 Kafka 成熟使用场景大型的电商下单金融交易系统

相关新闻