
一、如何保证生产者消息发送出去了先说一个关键点生产者能保证的是消息成功发送到 RocketMQ Broker。生产者不能直接保证消费者一定已经处理完业务。也就是说RocketMQ 是异步模型生产者 - Broker - 消费者生产者发完消息后通常只能知道消息有没有成功写入 Broker而不是消费者有没有最终处理完在之前讲的概念里Broker 是真正存消息的地方Producer 发消息实际是发给 BrokerConsumer 再从 Broker 拉消息处理。1. 生产者怎么确认“消息发出去了”RocketMQ 发送消息一般有三种方式同步发送 异步发送 单向发送1同步发送最常用可靠性较高生产者发送消息后会等待 RocketMQ 返回结果。SendResultsendResultrocketMQTemplate.syncSend(order_topic:create,订单创建成功订单ID10001);System.out.println(sendResult.getSendStatus());如果返回SEND_OK说明消息已经成功发送到 Broker。你可以理解为我把快递交给快递站了并且快递站给我回执了。2异步发送适合高并发生产者发出去后不阻塞当前线程RocketMQ 后面通过回调告诉你成功还是失败。rocketMQTemplate.asyncSend(order_topic:create,订单创建成功订单ID10001,newSendCallback(){OverridepublicvoidonSuccess(SendResultsendResult){System.out.println(发送成功sendResult);}OverridepublicvoidonException(Throwablethrowable){System.out.println(发送失败throwable.getMessage());}});3单向发送最快但不可靠rocketMQTemplate.sendOneWay(log_topic,用户访问日志);它不等结果也不关心成功失败。适合日志、埋点这种允许丢一点的场景。不适合订单、支付、优惠券这种重要业务。二、如何保证消费者已经完全消费消息这里要非常注意RocketMQ 自己只能保证“消费者成功返回后认为消息消费成功”。但你的业务是否真的处理成功要靠你的代码设计。消费者代码大概是这样ComponentRocketMQMessageListener(topicorder_topic,consumerGroupstock_consumer_group)publicclassStockConsumerimplementsRocketMQListenerString{OverridepublicvoidonMessage(Stringmessage){// 1. 执行业务逻辑比如扣库存System.out.println(扣库存message);// 2. 如果这里没有抛异常RocketMQ 就认为消费成功}}如果onMessage()正常执行结束RocketMQ 就认为这条消息消费成功如果你抛异常OverridepublicvoidonMessage(Stringmessage){thrownewRuntimeException(扣库存失败);}RocketMQ 就认为这条消息消费失败需要重试三、但是“完全消费”到底是什么意思这句话要拆开看。假设订单服务发了一条消息用户下单成功然后有三个消费者组stock_group扣库存 sms_group发短信 point_group加积分那么“完全消费”可能有两种意思。情况 1只关心某一个消费者组消费成功比如你只关心库存系统有没有扣库存成功。那你可以看stock_group 是否消费成功如果库存消费者正常执行完没有抛异常RocketMQ 就认为这个消费者组消费成功。情况 2关心所有业务系统都消费成功比如你想知道库存扣了 短信发了 积分加了这就不是 RocketMQ 自动帮你完成的了。你需要自己设计一张业务状态表。比如CREATETABLEorder_message_consume_status(idBIGINTPRIMARYKEYAUTO_INCREMENT,order_idBIGINT,message_keyVARCHAR(100),stock_statusTINYINT,sms_statusTINYINT,point_statusTINYINT,create_timeDATETIME,update_timeDATETIME);每个消费者处理完以后更新自己的状态库存系统消费成功 - stock_status 1 短信系统消费成功 - sms_status 1 积分系统消费成功 - point_status 1最后你判断stock_status 1 sms_status 1 point_status 1才能说这条订单消息相关业务都处理完成了。四、RocketMQ 本身不是用来做“强同步确认”的你不要把 MQ 理解成这种模式生产者发送消息 等待消费者全部处理完成 然后生产者再继续执行这就失去了 MQ 的意义。MQ 的核心价值是异步 解耦 削峰 最终一致性所以正常设计是生产者只保证消息可靠投递到 Broker 消费者保证自己最终能处理成功 业务通过状态表、重试、幂等来保证最终一致也就是MQ 系统里通常不追求“立刻一致”而是追求“最终一致”。五、如何让整个链路更可靠一般要做这几件事。1. 生产者使用同步发送并检查 SEND_OKSendResultsendResultrocketMQTemplate.syncSend(coupon_topic:expire,couponMessage);if(sendResult.getSendStatus()!SendStatus.SEND_OK){thrownewRuntimeException(优惠券过期消息发送失败);}这样可以保证生产者知道消息有没有成功交给 Broker。2. 重要业务使用事务消息比如创建订单成功 必须发送订单创建消息如果你先插入订单再发消息可能出现订单插入成功 消息发送失败这就不一致了。RocketMQ 的事务消息就是为了解决这种问题。大概流程是1. 先发送半消息 2. 执行本地事务比如创建订单 3. 本地事务成功提交消息 4. 本地事务失败回滚消息你可以理解为订单创建成功消息才真正对消费者可见。 订单创建失败消息不会被消费者看到。3. 消费者处理成功才正常返回消费者里不要随便吞异常。错误写法OverridepublicvoidonMessage(Stringmessage){try{// 扣库存stockService.deduct(message);}catch(Exceptione){// 只打印日志不抛异常e.printStackTrace();}}这个写法很危险。因为虽然扣库存失败了但是方法最后正常结束了。RocketMQ 会认为消费成功正确写法OverridepublicvoidonMessage(Stringmessage){try{stockService.deduct(message);}catch(Exceptione){thrownewRuntimeException(扣库存失败等待 RocketMQ 重试,e);}}这样 RocketMQ 才会重试。4. 消费者必须做幂等因为 RocketMQ 可能重复投递消息。比如同一个订单消息消费了两次订单10001扣库存 订单10001扣库存如果不做幂等就会多扣库存。可以用数据库唯一索引防重。比如CREATETABLEconsume_record(idBIGINTPRIMARYKEYAUTO_INCREMENT,message_keyVARCHAR(100)UNIQUE,consumer_groupVARCHAR(100),create_timeDATETIME);消费前先判断OverridepublicvoidonMessage(OrderMessagemessage){// 1. 判断是否消费过booleanconsumedconsumeRecordService.exists(message.getMessageKey());if(consumed){return;}// 2. 执行业务逻辑stockService.deduct(message.getOrderId());// 3. 记录已消费consumeRecordService.save(message.getMessageKey(),stock_group);}这样即使消息重复来了也不会重复扣库存。六、死信队列是什么死信队列英文是Dead Letter Queue DLQ它的意思是一条消息反复消费失败超过最大重试次数后就不会继续正常投递了而是进入死信队列。比如订单消息 - 库存消费者库存消费者一直失败第1次失败 第2次失败 第3次失败 ... 第16次失败超过重试次数后这条消息就会进入死信队列。七、为什么需要死信队列因为有些消息可能一直处理不了。例如订单ID不存在 数据库字段异常 业务数据格式错误 库存系统代码有 bug 外部接口一直失败如果 RocketMQ 一直无限重试会浪费资源还可能阻塞其他消息。所以 RocketMQ 会说这条消息我已经重试很多次了还是失败。 我先把它放到死信队列里后面人工排查。八、死信队列可以理解成什么你可以把它理解成问题快递暂存区普通消息流程消息 - 消费者 - 消费成功失败重试流程消息 - 消费者 - 失败 消息 - 消费者 - 失败 消息 - 消费者 - 失败最终消息 - 死信队列也就是正常队列处理不了的异常消息最后会被隔离到死信队列。九、死信队列的名字一般长什么样RocketMQ 的死信 Topic 一般类似%DLQ%consumer_group比如你的消费者组是stock_consumer_group那么死信队列可能是%DLQ%stock_consumer_group也就是说死信队列是按 Consumer Group 维度产生的。为什么因为同一条消息对不同消费者组来说消费结果可能不一样。例如order_topic 中有一条订单消息库存系统失败stock_group 消费失败短信系统成功sms_group 消费成功那么这条消息只会对stock_group进入死信队列不影响sms_group。十、举个完整例子订单服务发送消息Topic: order_topic Tag: create Body: {orderId:10001}库存消费者ComponentRocketMQMessageListener(topicorder_topic,consumerGroupstock_group,selectorExpressioncreate)publicclassStockConsumerimplementsRocketMQListenerString{OverridepublicvoidonMessage(Stringmessage){// 模拟一直失败thrownewRuntimeException(扣库存失败);}}RocketMQ 会不断重试。如果一直失败最终消息进入%DLQ%stock_group然后你可以查看死信消息 分析失败原因 修复数据或代码 重新投递消息十一、你真正应该记住的结论生产者发送可靠性同步发送 检查 SEND_OK能保证消息成功到达 Broker。消费者消费可靠性业务处理成功 - 正常返回 业务处理失败 - 抛异常让 RocketMQ 重试防止重复消费消费者必须做幂等因为消息可能被重复投递。死信队列消息多次消费失败后会进入死信队列等待人工排查或重新投递。十二、一句话总结RocketMQ 里生产者只能确认消息是否成功发送到 Broker不能直接确认所有消费者都处理完成消费者是否成功取决于消费方法是否正常返回。为了保证最终可靠需要同步发送、消费失败抛异常、自动重试、幂等处理、状态表追踪最后失败的消息会进入死信队列也就是专门存放“多次消费失败消息”的队列。