Kafka消息可靠性:从生产到消费的全链路不丢不重

发布时间:2026/5/27 22:48:16

Kafka消息可靠性:从生产到消费的全链路不丢不重 大家好我是程序员小策。先做个自测——你们项目里的 Kafka消息可靠性是怎么保证的A. 生产者acksall消费者手动提交 offset——觉得这样就不丢了。B. 加了个enable.idempotencetrue觉得幂等也够了。C. 用数据库事务包裹写业务 发消息两阶段提交然后手动补偿。D. 不知道反正运维说 Kafka 很可靠出问题了找运维。如果选了 A 或 B先别急——这两种配置在生产环境单机跑确实没问题但一到服务崩了重启、“网络抖动重试”、同一条消息被消费了两次这些场景就会暴露硬伤ack 只保证 Broker 收到了不保证消费者处理完了幂等只保证生产者不重复发不保证消费者不重复处理。今天这篇文章就是要从 GitHub 上一个生产级项目 ledgerly-saga-outbox-cqrs 的代码出发一步步拆解如何在生产上做到全链路消息不丢 端到端幂等处理。问题定义消息到底在哪丢了一条消息从诞生到被处理要穿过三段链路生产者 → [网络] → Kafka Broker → [网络] → 消费者 → [处理逻辑]任何一段断了消息就丢了。大多数人只关注了其中一段。具体来拆阶段怎么丢的典型场景生产端发消息前服务崩了写数据库成功但kafkaTemplate.send()还没来得及执行Broker 端Leader 挂了副本没同步完acks1时 Leader 收到就返回成功副本还没复制Leader 宕机消费端自动提交 offset 但没处理完enable.auto.committrue消息拉到内存就提交了 offset还没来得及处理服务重启了而更隐蔽的问题是——即使消息没丢重复消费才是最常被忽视的。生产者因网络超时重试 → Broker 收到两条一模一样的消息 → 消费者处理了两遍 → 用户被扣了两次钱。那么问题来了怎么同时解决不丢和不重核心概念用一个外卖订单类比全链路消息不丢At-Least-Once每一段链路都有确认机制没收到确认就重试直到确认为止。幂等处理Idempotency同一条消息无论被处理多少次最终结果和执行一次完全一样。打个比方——你在美团点了一份黄焖鸡米饭不丢At-Least-Once怎么保证你下单 → 平台必须告诉你下单成功生产端确认平台推给商家 → 商家必须确认收到订单Broker 确认骑手取餐 → 必须扫码确认已取餐消费端手动提交 offset任何一步没收到确认系统就重推。但重推带来了新问题——重复。网络抖了一下平台没收到商家的确认于是又推了一次。商家看到两条一模一样的订单——如果做了两份黄焖鸡用户只付了一份钱商家亏了。幂等就是商家的去重逻辑订单号idempotency key已经处理过直接返回第一次的结果不再重复做菜。翻译回技术语言订单号 Kafka 消息头里的idempotency-key已经处理过的判断 Redis SETNX 数据库唯一约束“返回第一次结果” IdempotencyService 查到已有记录直接返回接下来看代码怎么落地。代码实现拆解一个生产级 Kafka 项目以下代码全部来自 dkrmerve/ledgerly-saga-outbox-cqrs一个生产级 Spring Boot 项目涵盖了Transactional Outbox、DLT 死信队列、Redis 去重、DB 幂等四个维度。阶段一生产端不丢 —— Transactional Outbox 模式问题下面这种写法如果服务崩在send之前数据库已经写了消息没发出去。// 反例写DB和发消息不是原子的TransactionalpublicvoidcreateOrder(Orderorder){orderRepository.save(order);// ① 成功了kafkaTemplate.send(order-topic,order);// ② 还没来得及执行 → 服务崩了}解法Transactional Outbox。不在业务方法里直接发 Kafka而是先把消息和业务数据在同一个事务里写到一张outbox_event表然后由独立的定时任务从这张表里取消息发到 Kafka。看代码。第一步业务操作 写 Outbox 表在同一个事务中OutboxService.javaServicepublicclassOutboxService{privatefinalOutboxRepositoryoutboxRepository;privatefinalObjectMapperomnewObjectMapper();TransactionalpublicvoidenqueueSagaCommand(longorderId,StringidempotencyKey,UUIDcorrelationId,SagaEventevent){enqueue(KafkaTopics.topicSagaCommands(),orderId,idempotencyKey,correlationId,event.eventType,event);}privatevoidenqueue(Stringtopic,longorderId,StringidempotencyKey,UUIDcorrelationId,StringeventType,Objectpayload){try{OutboxEventEntityenewOutboxEventEntity();e.setId(UUID.randomUUID());e.setTopic(topic);e.setAggregateType(ORDER);e.setAggregateId(String.valueOf(orderId));e.setEventType(eventType);e.setPayload(om.writeValueAsString(payload));e.setStatus(NEW);// ← 初始状态待发送e.setCorrelationId(correlationId);e.setIdempotencyKey(idempotencyKey);// ← 幂等键跟着消息走e.setOccurredAt(Instant.now());e.setPublishAttempts(0);// ← 重试计数器outboxRepository.save(e);}catch(Exceptionex){thrownewRuntimeException(ex);}}}关键设计OutboxService.enqueue()和OrderService.createOrder()在同一个Transactional下执行。PostgreSQL 的事务保证了要么订单 Outbox 记录一起写入要么一起回滚。不存在订单写了、消息没写的情况。第二步独立的定时任务从 Outbox 表取消息发到 KafkaOutboxPublisherJob.javaEnableSchedulingComponentpublicclassOutboxPublisherJob{privatefinalOutboxRepositoryoutboxRepository;privatefinalOutboxKafkaPublisherpublisher;privatefinalintbatchSize;// 每次取多少条privatefinalintleaseSeconds;// 租约时间防止多节点重复发送privatefinalStringnodeId;// 当前节点标识Scheduled(fixedDelayString${ledgerly.outbox.publishFixedDelayMs})TransactionalpublicvoidpublishLoop(){ListOutboxEventEntitybatchoutboxRepository.leaseBatch(batchSize);if(batch.isEmpty())return;InstantlockUntilInstant.now().plusSeconds(leaseSeconds);for(OutboxEventEntitye:batch){outboxRepository.markLocked(e.getId(),nodeId,lockUntil);try{publisher.publish(e);// 发到 Kafkae.setStatus(PUBLISHED);// 标记已发送e.setPublishedAt(Instant.now());e.setPublishAttempts(e.getPublishAttempts()1);e.setLastError(null);}catch(Exceptionex){e.setPublishAttempts(e.getPublishAttempts()1);e.setLastError(ex.getMessage());e.setStatus(NEW);// 恢复为NEW下次重试}outboxRepository.save(e);}}}关键设计点租约机制Lease多节点部署时leaseBatch()用SELECT ... FOR UPDATE SKIP LOCKED给记录加锁防止同一消息被多个节点重复发送。重试机制发送失败的消息状态回退为NEW下一轮定时任务会重新拾取。不再丢只要消息写入了 Outbox 表就一定会被发送到 Kafka——即使服务重启也不怕。第三步真正发送到 Kafka带上幂等键和链路追踪信息OutboxKafkaPublisher.javaComponentpublicclassOutboxKafkaPublisher{privatefinalKafkaTemplateString,StringkafkaTemplate;publicvoidpublish(OutboxEventEntitye){// key aggregateId确保同一订单的消息进同一分区保证有序ProducerRecordString,StringrecordnewProducerRecord(e.getTopic(),e.getAggregateId(),e.getPayload());// 在 Kafka Header 中注入元数据——消费端幂等和链路追踪的基础record.headers().add(KafkaHeaders.CORRELATION_ID,e.getCorrelationId().toString().getBytes(StandardCharsets.UTF_8));if(e.getIdempotencyKey()!null){record.headers().add(KafkaHeaders.IDEMPOTENCY_KEY,e.getIdempotencyKey().getBytes(StandardCharsets.UTF_8));}record.headers().add(KafkaHeaders.EVENT_ID,e.getId().toString().getBytes(StandardCharsets.UTF_8));record.headers().add(KafkaHeaders.ORDER_ID,e.getAggregateId().getBytes(StandardCharsets.UTF_8));record.headers().add(KafkaHeaders.EVENT_TYPE,e.getEventType().getBytes(StandardCharsets.UTF_8));kafkaTemplate.send(record).completable().join();// 同步等待结果}}阶段二Broker 端不丢 —— 生产级配置光靠代码不够Kafka Broker 端必须配上正确的参数。看这个项目的application.ymlspring:kafka:bootstrap-servers:localhost:9092producer:acks:all# ① 等待所有ISR副本确认properties:enable.idempotence:true# ② 生产者幂等PID Sequence Numberconsumer:enable-auto-commit:false# ③ 禁止自动提交offsetproperties:isolation.level:read_committed# ④ 只读已提交的事务消息listener:ack-mode:manual# ⑤ 手动确认模式逐条解释为什么这样配参数值为什么丢了会怎样acksall/-1等待所有 ISRIn-Sync Replicas副本都写入后才返回成功。Leader 挂了任一 ISR 副本能接替acks1时 Leader 确认后立即宕机副本还没同步消息永久丢失enable.idempotencetrueBroker 给每个 Producer 分配 PIDProducer 给每条消息分配 Sequence Number。Broker 发现重复的 PIDSeq 就丢弃网络超时重试 → Broker 收到重复消息 → 消费者处理两遍enable.auto.commitfalse必须手动提交 offset。自动提交 消息拉到内存就认为消费成功处理逻辑还没跑服务就崩了重启后从已提交的 offset 继续中间的消息没处理但 offset 已经跳过了isolation.levelread_committed只消费已提交事务的消息未提交的事务消息不可见。配合事务生产者使用读到未提交的事务消息事务回滚后这条消息实际不存在ack-modemanual消费者处理完业务逻辑后手动调用acknowledgment.acknowledge()record模式在 listener 返回后就自动提交异常时消息已被标记为已消费阶段三消费端不重 —— 双重去重Redis DB现在消息一定会到达消费者但可能到达多次生产者重试、网络重试、Rebalance 重试。这个项目的消费端去重分为两层第一层Redis 快速去重RedisDedupService.javaServicepublicclassRedisDedupService{privatefinalStringRedisTemplateredis;privatefinalDurationttl;// 默认86400秒 24小时/** * 使用 Redis SETNX 原子操作判断是否是第一次处理 * key dedup:{consumer}:{eventId} * 返回 true 第一次处理可以继续 * 返回 false 已处理过跳过 */publicbooleanfirstTime(Stringconsumer,StringeventId){Stringkeydedup:consumer:eventId;Booleanokredis.opsForValue().setIfAbsent(key,1,ttl);returnBoolean.TRUE.equals(ok);}}第二层DB 权威去重InboxService.javaRedis 是快速路径——如果数据过期了或者 Redis 挂了仍然需要数据库兜底ServicepublicclassInboxService{privatefinalInboxRepositoryinboxRepository;privatefinalRedisDedupServiceredisDedupService;/** * Exactly-once 双重保障 * ① Redis SETNX快速判断大概率是不是重复 * ② DB Inbox 表唯一约束(eventId, consumer) 联合主键——权威去重 * * claim() 和业务逻辑在同一个 Transactional 中执行 * 任何一步失败都整体回滚保证原子性。 */Transactionalpublicbooleanclaim(Stringconsumer,UUIDeventId){booleanlikelyFirstredisDedupService.firstTime(consumer,eventId.toString());if(!likelyFirst){// Redis 命中了大概率重复但最终以 DB 为准}InboxEventEntityenewInboxEventEntity();e.setEventId(eventId);e.setConsumer(consumer);e.setProcessedAt(Instant.now());try{inboxRepository.save(e);// ← 唯一约束重复插入抛异常returntrue;// → 第一次处理继续执行业务逻辑}catch(Exceptionex){returnfalse;// → 重复消息跳过}}}双重去重的精妙之处Redis SETNXO(1) 时间复杂度挡住 99% 的重复流量DB 唯一约束Redis 数据过期或宕机后的保底方案在同一个事务中执行保证去重和业务处理的原子性两层都失败 消息真的重复了跳过不处理阶段四处理失败怎么办 —— 死信队列DLT消息不丢了也不重复了但如果业务处理一直失败怎么办不能无限重试。这个项目的方案是0 次重试直接进死信队列KafkaConfig.javaConfigurationpublicclassKafkaConfig{BeanConcurrentKafkaListenerContainerFactoryString,StringkafkaListenerContainerFactory(ConsumerFactoryString,StringconsumerFactory,KafkaTemplateObject,ObjectkafkaTemplate){ConcurrentKafkaListenerContainerFactoryString,StringfactorynewConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);// 死信队列处理失败的消息自动转发到 {原topic}.DLTDeadLetterPublishingRecovererrecoverernewDeadLetterPublishingRecoverer(kafkaTemplate,(record,ex)-newTopicPartition(record.topic().DLT,record.partition()));// 0次重试 → 立即进入 DLT由人工或定时任务处理DefaultErrorHandlererrorHandlernewDefaultErrorHandler(recoverer,newFixedBackOff(0L,0));errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);factory.setCommonErrorHandler(errorHandler);returnfactory;}}这样整个消息生命周期形成了完整的闭环业务操作 → Outbox表 → 定时发送 → Kafka → 消费者 ↓ Redis SETNX快速去重 ↓ DB 唯一约束权威去重 ↓ 执行业务逻辑 ↙ ↘ 成功 失败 手动Ack offset → {topic}.DLT死信队列边界情况与陷阱代码跑起来才会翻的车看起来很完美了对吧但以下几个坑在生产上真实发生过。陷阱一Outbox 表无限膨胀。Outbox 表发完消息后不删记录几个月后表里有几千万条数据定时扫描越来越慢。解法定期归档或删除statusPUBLISHED且published_at NOW() - 7天的记录。陷阱二Redis SETNX 的 TTL 设置不当。TTL 太短比如 5 分钟Consumer Rebalance 后重试 → Redis key 已过期 → 查不到 → 重复处理。上面项目里 TTL 是 86400 秒24 小时覆盖了绝大多数重试窗口。陷阱三max.poll.interval.ms太小导致死循环 Rebalance。消费者处理慢 → 超过max.poll.interval.ms→ 被踢出消费者组 → Rebalance → 重新分配分区 → 重新处理同一条消息 → 更慢 → 又超时 → 又 Rebalance……解法增大max.poll.interval.ms默认 5 分钟或减小max.poll.records每次少拉几条。陷阱四acksallmin.insync.replicas1 白配了。acksall等的是所有 ISR 副本但如果min.insync.replicas1且 ISR 里只剩 Leader 一个副本那就退化成了acks1。必须同时设置min.insync.replicas 2。高级考量ID 生成与顺序性消息不丢不重了还有一个隐性需求——同一个订单的操作必须有序消费。用户先下单后取消取消消息不能被先消费。这个项目的做法用aggregateId订单 ID作为 Kafka 消息的 Key。// OutboxKafkaPublisher.java 中的这行代码ProducerRecordString,StringrecordnewProducerRecord(e.getTopic(),e.getAggregateId(),e.getPayload());// ↑ key orderId保证同一订单的消息进同一分区Kafka 保证同一分区内的消息严格有序。把同一个订单的所有操作路由到同一分区 该订单的所有操作有序。那么全局有序呢所有消息进一个分区就行——但那样吞吐量就只有单分区的能力。实际生产上几乎不需要全局有序分区有序足够。对比表格四种可靠性方案对比方案核心思路不丢不重复杂度适用场景纯 Kafka 参数acksall 手动提交✅ Broker端❌ 消费端不防重低允许少量重复的场景如日志Kafka 事务executeInTransactionsend✅✅ 生产端幂等中发消息 写DB需原子性但允许重复消费Transactional Outbox Redis去重DB事务写Outbox → 定时发Kafka → Redis SETNX去重✅ 全链路⚠️ Redis 不可靠中对数据一致性要求高的业务Outbox Redis DB双重去重本项目上述基础上加DB唯一约束兜底✅ 全链路✅ 端到端高金融、交易等对重复零容忍的场景面试追问面试官想听的不是配几个参数追问 1enable.idempotencetrue的原理是什么它和消费者幂等有什么区别→ 回答方向Kafka 生产者幂等是Broker 层面的去重——Broker 给每个 Producer 分配一个 PIDProducer IDProducer 给每个消息分区分配一个单调递增的 Sequence Number。Broker 收到消息时检查PID Seq是否连续发现重复或乱序就丢弃。但这只保证生产者到 Broker 这一段不重复。消费者拿到消息后重复消费生产者幂等管不了——必须做消费端幂等。追问 2为什么不用 Kafka 的事务消息initTransactionscommitTransaction替代 Outbox→ 回答方向Kafka 事务可以保证发消息和消息本身的原子性但它不能保证发消息和写 MySQL的原子性——除非用 EOSExactly Once Semantics全家桶但那要求消费者也必须是事务消费isolation.levelread_committed且要求下游也是 Kafka。你写的是 PostgreSQLKafka 事务管不着。Outbox 模式把写DB写Outbox表放在同一个本地事务中是最简单可靠的方案。追问 3Outbox 定时任务的轮询间隔700ms怎么定的会不会成为瓶颈→ 回答方向轮询间隔是延迟和 DB 压力的权衡。700ms 意味着消息最多延迟 700ms 才能被消费。如果需要更低延迟可以用 Debezium 之类的 CDC 工具监听 Outbox 表 binlog 实时发送。但如果业务允许秒级延迟700ms 完全可以接受。瓶颈在leaseBatch()的SKIP LOCKED——它能保证多节点并行取不同的批次水平扩展即可增加吞吐。追问 4Redis 去重的 TTL 过期了怎么办消息重试窗口比 TTL 还长。→ 回答方向这就是为什么需要DB 唯一约束作为兜底。Redis 是性能优化DB 是数据一致性的保底方案。即使 Redis key 过期了DB 的InboxEventEntity(eventId, consumer)联合主键也能保证不重复。两层去重谁快用谁谁稳信谁。总结消息不丢靠 Outbox acksall消息不重靠 Redis SETNX DB 唯一约束双重去重处理失败靠 DLT 兜底。读完这篇你应该能画出 Kafka 消息从生产到消费的全链路并标注每一段的可靠性保障措施在项目里落地 Transactional Outbox 模式用 DB 事务替代手动发消息设计 Redis DB 双重去重方案而不是开口只说用 Redis 做幂等在面试时说出enable.idempotence的底层原理PID Sequence Number而不只是配个参数就行理解 DLT 死信队列的价值——不是每条失败的消息都值得无限重试有时候快速失败然后人工介入才是对的

相关新闻