RocketMQ深度解析:阿里双十一背后的消息队列王者

发布时间:2026/6/25 12:29:59

RocketMQ深度解析:阿里双十一背后的消息队列王者 开篇一个真实的技术难题双十一零点某电商平台订单系统瞬间涌入800万笔订单。如果采用传统的同步调用订单服务处理150ms库存服务扣减200ms积分服务增加180ms短信通知发送300ms总耗时830ms用户体验极差系统直接崩溃。而使用RocketMQ后整个链路被重新设计订单服务仅需150ms完成核心逻辑其他操作全部异步化处理。系统稳定承载了峰值流量用 户体验丝滑流畅。这就是消息队列的魔力。今天我们深入剖析阿里开源的消息队列王者——RocketMQ。一、消息队列三问为什么需要能干什么有啥代价1.1 为什么消息队列会出现答案很简单分布式系统需要解耦和异步通信。想象一个外卖系统用户下单 → 订单服务订单服务 → 调用支付服务支付服务 → 调用配送服务配送服务 → 调用短信服务这是一个同步调用链任何一环出问题整个链路崩溃。引入消息队列后 用户下单 → 订单服务 → MQ → [支付/配送/短信] 各自消费核心改变从强耦合的同步链条变成松耦合的事件驱动。1.2 消息队列的三大核心价值价值一异步提速用户体验飞升对比实验同步模式 public void createOrder(Order order) { // 1. 保存订单 - 150ms orderService.save(order);// 2. 扣减库存 - 200msinventoryService.deduct(order);​// 3. 发送短信 - 300mssmsService.send(order.getPhone());​// 总耗时650ms}异步模式使用MQ public void createOrder(Order order) { // 1. 保存订单 - 150ms orderService.save(order);// 2. 发送消息到MQ - 10msrocketMQTemplate.convertAndSend(order-topic, order);​// 用户立即收到响应总耗时160ms}用户感知到的响应时间从650ms降低到160ms提速75%价值二系统解耦扩展性暴增场景演化初始需求 用户下单后发送短信 orderService.create(order); smsService.send(order);需求迭代一 增加邮件通知 orderService.create(order); smsService.send(order); emailService.send(order); // 修改代码重新部署需求迭代二 增加积分系统 orderService.create(order); smsService.send(order); emailService.send(order); pointService.add(order); // 再次修改再次部署使用MQ后 // 订单服务代码永远不变 orderService.create(order); mqProducer.send(order-created, order);// 新增功能只需新增消费者零侵入这就是发布-订阅模式的威力生产者无需知道消费者是谁消费者随意增删。价值三削峰填谷系统稳如磐石极端场景 秒杀活动正常流量100 QPS 秒杀瞬间10,000 QPS (100倍)没有MQ下游服务直接被击穿数据库连接池耗尽系统雪崩引入MQ后 // 生产者尽管10,000 QPS涌入MQ全部接收 for (Request req : requests) { mq.send(seckill-topic, req); }// 消费者按自己的节奏处理比如每秒200个 RocketMQMessageListener(topic seckill-topic) public void onMessage(Request req) { // 稳定处理不会被冲垮 processRequest(req); }MQ就像一个缓冲水池把洪峰流量削平。1.3 使用消息队列的代价天下没有免费的午餐引入MQ会带来代价一系统复杂度上升需要维护额外的MQ集群需要考虑高可用方案代价二消息可能丢失网络闪断导致发送失败Broker宕机导致数据丢失代价三消息可能重复消费成功但ACK失败导致重复消费代价四消息可能乱序多队列并行消费失败重试打乱顺序代价五分布式事务难题本地事务与消息发送如何保证一致性这些问题RocketMQ都有解决方案二、RocketMQ是什么为什么选择它2.1 RocketMQ的身份定位RocketMQ是阿里巴巴在2012年开源的分布式消息中间件2016年捐献给Apache基金会成为顶级项目。核心特性高性能单机支持万级QPS高可靠消息零丢失方案分布式水平扩展万亿级消息低延迟毫秒级消息投递在阿里内部的实战数据服务上千个应用双十一当天万亿级消息流转消息不丢失率99.9999%2.2 RocketMQ vs Kafka vs RabbitMQ┌────────────┬─────────────────────────┬──────────────────┬───────────────┐ │ 维度 │ RocketMQ │ Kafka │ RabbitMQ │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 吞吐量 │ 十万级 │ 百万级 │ 万级 │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 延迟 │ 毫秒级 │ 毫秒级 │ 微秒级 │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 消息可靠性 │ 极高 │ 高 │ 高 │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 顺序消息 │ 支持 │ 支持分区内有序 │ 不支持 │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 定时消息 │ 支持5.x任意精度 │ 不支持 │ 支持插件 │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 事务消息 │ 支持 │ 支持 │ 不支持 │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 多语言 │ Java为主5.x支持多语言 │ 多语言SDK成熟 │ 多语言SDK成熟 │ ├────────────┼─────────────────────────┼──────────────────┼───────────────┤ │ 适用场景 │ 金融、电商、业务系统 │ 大数据、日志收集 │ 传统企业应用 │ └────────────┴─────────────────────────┴──────────────────┴───────────────┘选择建议金融、电商、核心业务 → RocketMQ可靠性第一大数据处理、日志收集 → Kafka吞吐量第一传统企业、轻量级应用 → RabbitMQ简单易用三、RocketMQ核心架构四大组件深度解析3.1 整体架构图┌─────────────┐ ┌─────────────┐ │ Producer1 │ │ Producer2 │ └──────┬──────┘ └──────┬──────┘ │ │ └────────┬───────────┘ │ 发送消息 ↓ ┌───────────────┐ │ NameServer │ (路由注册中心) │ 集群(无状态) │ └───────┬───────┘ │ 获取路由 ↓ ┌───────────────────────┐ │ Broker Cluster │ │ ┌─────────────────┐ │ │ │ Master Broker │ │ (消息存储) │ │ (读写) │ │ │ └────────┬────────┘ │ │ │ 同步 │ │ ┌────────▼────────┐ │ │ │ Slave Broker │ │ │ │ (只读) │ │ │ └─────────────────┘ │ └───────────┬───────────┘ │ 拉取消息 ↓ ┌────────┴────────┐ │ │ ┌──────▼──────┐ ┌───────▼──────┐ │ Consumer1 │ │ Consumer2 │ └─────────────┘ └──────────────┘3.2 NameServer轻量级的注册中心角色定位 类似服务注册中心但更轻量。核心功能Broker注册- Broker启动时向所有NameServer注册- 每30秒发送一次心跳路由信息管理- 存储Topic与Broker的映射关系- 生产者和消费者从这里获取路由关键设计去中心化// NameServer之间完全独立不互相通信 NameServer1 NameServer2 NameServer3 ↑ ↑ ↑ └───────────┴───────────┘ Broker优势无单点故障部署简单性能极高对比ZooKeeper┌──────────┬────────────┬────────────────────────┐ │ 特性 │ NameServer │ ZooKeeper │ ├──────────┼────────────┼────────────────────────┤ │ 一致性 │ 最终一致 │ 强一致Paxos/Raft │ ├──────────┼────────────┼────────────────────────┤ │ 性能 │ 极高 │ 相对较低 │ ├──────────┼────────────┼────────────────────────┤ │ 复杂度 │ 简单 │ 复杂需选举 │ ├──────────┼────────────┼────────────────────────┤ │ 单点故障 │ 无 │ 有Leader宕机影响写 │ └──────────┴────────────┴────────────────────────┘3.3 Broker消息的仓库与快递员Broker的双重角色角色一消息存储引擎消息写入流程 ┌─────────────┐ │ CommitLog │ ← 所有消息顺序写入类似MySQL的binlog └──────┬──────┘ │ 异步构建索引 ↓ ┌──────────────┐ │ ConsumeQueue │ ← 每个Topic-Queue的消息索引 └──────┬───────┘ │ ↓ ┌──────────────┐ │ IndexFile │ ← 消息Key索引可选 └──────────────┘存储设计的精妙之处所有消息顺序写CommitLog- 利用磁盘顺序写的高性能接近内存- 避免随机IOConsumeQueue只存索引- 每条索引20字节CommitLog物理偏移消息大小TagCode- 消费者先读索引再从CommitLog读消息体PageCache 零拷贝- 操作系统的PageCache缓存热数据- sendfile零拷贝技术减少数据拷贝次数角色二高可用保障主从架构 ┌──────────────┐ │ Master Broker│ ← 读写 └──────┬───────┘ │ 同步/异步复制 ↓ ┌──────────────┐ │ Slave Broker │ ← 只读Master宕机后可消费 └──────────────┘主从复制策略┌──────────┬─────────────────────────┬────────────┬──────────────────┐ │ 策略 │ 说明 │ 优点 │ 缺点 │ ├──────────┼─────────────────────────┼────────────┼──────────────────┤ │ 同步复制 │ Master等待Slave同步成功 │ 数据不丢失 │ 性能低 │ ├──────────┼─────────────────────────┼────────────┼──────────────────┤ │ 异步复制 │ Master不等待Slave │ 性能高 │ 可能丢失少量数据 │ └──────────┴─────────────────────────┴────────────┴──────────────────┘生产建议 核心业务用同步复制日志类用异步复制。3.4 Producer消息的发送者三种发送方式方式一同步发送Syncpublic void sendSync() throws Exception { Message msg new Message(TopicTest, TagA, Hello RocketMQ.getBytes());// 同步发送等待结果SendResult result producer.send(msg);​System.out.println(消息ID result.getMsgId());System.out.println(发送状态 result.getSendStatus());}适用场景 重要通知、支付结果方式二异步发送Asyncpublic void sendAsync() throws Exception { Message msg new Message(TopicTest, TagA, Hello RocketMQ.getBytes());// 异步发送通过回调处理结果producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult result) {System.out.println(发送成功 result.getMsgId());}​Overridepublic void onException(Throwable e) {System.err.println(发送失败 e.getMessage());// 记录失败日志后续补偿}});}适用场景 高并发场景用户操作日志方式三单向发送Onewaypublic void sendOneway() throws Exception { Message msg new Message(TopicTest, TagA, Hello RocketMQ.getBytes());// 单向发送不等待结果producer.sendOneway(msg);}适用场景 日志收集、不重要的埋点数据3.5 Consumer消息的消费者两种消费模式模式一集群消费ClusteringComponent RocketMQMessageListener( topic order-topic, consumerGroup order-consumer-group, messageModel MessageModel.CLUSTERING // 集群模式 ) public class OrderConsumer implements RocketMQListenerString { Override public void onMessage(String message) { System.out.println(消费消息 message); } }特点消费者组内的消费者共同消费消息每条消息只被组内一个消费者消费适用场景 业务处理、数据同步模式二广播消费BroadcastingRocketMQMessageListener( topic cache-refresh-topic, consumerGroup cache-consumer-group, messageModel MessageModel.BROADCASTING // 广播模式 ) public class CacheConsumer implements RocketMQListenerString { Override public void onMessage(String message) { // 每个消费者都会收到这条消息 refreshCache(message); } }特点每个消费者都会收到所有消息适用场景 缓存刷新、配置更新四、核心问题突破如何保证消息不丢失消息丢失可能发生在三个环节我们逐一击破。4.1 生产者端防丢失问题场景 网络抖动导致消息发送失败解决方案重试机制 回调确认Configuration public class RocketMQConfig {Beanpublic DefaultMQProducer producer() {DefaultMQProducer producer new DefaultMQProducer(producer-group);producer.setNamesrvAddr(localhost:9876);​// 配置重试次数producer.setRetryTimesWhenSendFailed(3);producer.setRetryTimesWhenSendAsyncFailed(3);​// 配置发送超时时间producer.setSendMsgTimeout(10000);​return producer;}}异步发送的最佳实践public void sendWithRetry(OrderEvent event) { Message msg new Message(order-topic, JSON.toJSONString(event).getBytes());producer.send(msg, new SendCallback() {Overridepublic void onSuccess(SendResult result) {// 发送成功记录日志log.info(消息发送成功msgId{}, result.getMsgId());}​Overridepublic void onException(Throwable e) {// 发送失败记录到本地消息表log.error(消息发送失败{}, e.getMessage());saveToLocalMessageTable(event); // 后续定时补偿}});}4.2 Broker端防丢失核心配置三件套1. 刷盘策略同步刷盘性能换可靠性flushDiskTypeSYNC_FLUSH2. 主从复制同步复制brokerRoleSYNC_MASTER3. 最少同步副本数minSlaveNums1配置解释刷盘策略对比┌─────────────┬───────────────────────────────┬──────────────────┬──────┐ │ 策略 │ 说明 │ 可靠性 │ 性能 │ ├─────────────┼───────────────────────────────┼──────────────────┼──────┤ │ ASYNC_FLUSH │ 异步刷盘写入PageCache就返回 │ 低宕机丢数据 │ 高 │ ├─────────────┼───────────────────────────────┼──────────────────┼──────┤ │ SYNC_FLUSH │ 同步刷盘写入磁盘才返回 │ 高 │ 低 │ └─────────────┴───────────────────────────────┴──────────────────┴──────┘主从复制对比┌──────────────┬─────────────────────────────┬────────┬──────┐ │ 策略 │ 说明 │ 可靠性 │ 性能 │ ├──────────────┼─────────────────────────────┼────────┼──────┤ │ ASYNC_MASTER │ 异步复制Master不等Slave │ 低 │ 高 │ ├──────────────┼─────────────────────────────┼────────┼──────┤ │ SYNC_MASTER │ 同步复制Master等Slave确认 │ 高 │ 低 │ └──────────────┴─────────────────────────────┴────────┴──────┘生产推荐配置金融、支付等核心业务flushDiskTypeSYNC_FLUSH brokerRoleSYNC_MASTER日志、埋点等非核心业务flushDiskTypeASYNC_FLUSH brokerRoleASYNC_MASTER4.3 消费者端防丢失问题场景 消息拉取后业务处理失败但offset已提交解决方案手动提交 业务幂等Component RocketMQMessageListener( topic order-topic, consumerGroup order-group ) public class OrderConsumer implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {try {// 业务处理String body new String(message.getBody());processOrder(body);​// 业务处理成功消息会自动ACK​} catch (Exception e) {// 业务处理失败抛出异常// RocketMQ会自动重试最多16次throw new RuntimeException(处理失败, e);}}}关键点业务成功才返回失败抛异常触发重试业务逻辑必须幂等因为会重试五、高级特性顺序消息与事务消息5.1 顺序消息让消息按序而行业务场景 订单状态流转订单创建 → 订单支付 → 订单发货 → 订单完成这四条消息必须按顺序处理否则业务逻辑错乱。实现原理MessageGroup 单队列消费// 生产者指定MessageGroup public void sendOrderMessage(String orderId, String status) { Message msg new Message( order-topic, status-change, (orderId - status).getBytes() );// 核心使用orderId作为MessageGroup// 相同orderId的消息会进入同一个队列SendResult result producer.send(msg, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {String orderId (String) arg;int index Math.abs(orderId.hashCode()) % mqs.size();return mqs.get(index);}}, orderId);}// 消费者单线程顺序消费 RocketMQMessageListener( topic order-topic, consumerGroup order-group, consumeMode ConsumeMode.ORDERLY, // 顺序消费模式 consumeThreadMax 1 // 单线程消费 ) public class OrderConsumer implements RocketMQListenerString { Override public void onMessage(String message) { // 按顺序处理订单状态 updateOrderStatus(message); } }顺序保证的三个层次生产顺序 相同MessageGroup的消息发往同一队列存储顺序 队列内消息按FIFO存储消费顺序 消费者按队列顺序拉取并单线程处理5.2 事务消息分布式事务的优雅解决方案经典场景 下单扣库存订单服务创建订单本地事务 库存服务扣减库存远程调用要么都成功要么都失败。RocketMQ事务消息流程Service public class OrderService {Resourceprivate TransactionMQProducer producer;​public void createOrder(Order order) {// 1. 发送Half消息对消费者不可见Message msg new Message(order-topic, JSON.toJSONString(order).getBytes());​producer.sendMessageInTransaction(msg, order);}​// 2. 执行本地事务回调Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {Order order (Order) arg;​try {// 执行本地数据库操作orderMapper.insert(order);​// 本地事务成功提交Half消息return LocalTransactionState.COMMIT_MESSAGE;​} catch (Exception e) {// 本地事务失败回滚Half消息return LocalTransactionState.ROLLBACK_MESSAGE;}}​// 3. 事务状态回查Broker会定期询问Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 查询本地事务执行结果String orderId new String(msg.getBody());Order order orderMapper.selectById(orderId);​if (order ! null) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.ROLLBACK_MESSAGE;}}}事务消息状态机Producer发送Half消息 ↓Broker存储Half消息对Consumer不可见 ↓Producer执行本地事务 ↓Producer提交事务状态COMMIT/ROLLBACK ↓5a. COMMIT → 消息对Consumer可见5b. ROLLBACK → 消息被删除↓如果Producer未响应Broker定期回查事务状态六、生产实战高可用部署与监控6.1 集群部署架构推荐配置2Master 2SlaveNameServer集群3节点 ↓ ┌──────────────────────┐ │ Broker Cluster │ │ ┌────────────────┐ │ │ │ Master-1 │ │ │ │ Slave-1 │ │ │ └────────────────┘ │ │ ┌────────────────┐ │ │ │ Master-2 │ │ │ │ Slave-2 │ │ │ └────────────────┘ │ └──────────────────────┘配置文件示例Master-1配置brokerClusterNameDefaultCluster brokerNamebroker-a brokerId0 brokerRoleSYNC_MASTER flushDiskTypeSYNC_FLUSHSlave-1配置brokerClusterNameDefaultCluster brokerNamebroker-a brokerId1 brokerRoleSLAVE flushDiskTypeASYNC_FLUSH6.2 核心监控指标┌──────────────────┬──────────┬──────────────────────┐ │ 监控项 │ 告警阈值 │ 处理措施 │ ├──────────────────┼──────────┼──────────────────────┤ │ 消息堆积量 │ 10万 │ 增加消费者 │ ├──────────────────┼──────────┼──────────────────────┤ │ 发送成功率 │ 99% │ 检查网络和Broker状态 │ ├──────────────────┼──────────┼──────────────────────┤ │ 消费TPS │ 持续下降 │ 检查消费者健康度 │ ├──────────────────┼──────────┼──────────────────────┤ │ Broker磁盘使用率 │ 85% │ 清理过期消息 │ ├──────────────────┼──────────┼──────────────────────┤ │ 消费延迟 │ 1分钟 │ 扩容消费者 │ └──────────────────┴──────────┴──────────────────────┘七、常见问题FAQQ1消息重复消费怎么办答业务幂等设计是唯一解Service public class PaymentService {public void processPayment(String orderId, BigDecimal amount) {// 方案一数据库唯一索引try {paymentMapper.insert(orderId, amount);} catch (DuplicateKeyException e) {// 重复消息直接返回log.warn(订单{}已支付忽略重复消息, orderId);return;}​// 方案二Redis分布式锁String lockKey payment: orderId;if (redisTemplate.opsForValue().setIfAbsent(lockKey, 1, 10, TimeUnit.MINUTES)) {// 获取锁成功处理业务doPayment(orderId, amount);}}}Q2消息堆积怎么处理答定位瓶颈针对性优化1. 查看消息堆积情况./mqadmin consumerProgress -g order-consumer-group2. 增加消费者数量注意消费者数量不能超过队列数量3. 提高单个消费者的处理能力- 优化业务逻辑- 增加消费线程数RocketMQMessageListener( topic order-topic, consumerGroup order-group, consumeThreadMin 20, // 最小线程数 consumeThreadMax 64 // 最大线程数 )Q3RocketMQ 5.x vs 4.x该如何选择核心差异| 特性 |4.x | 5.x | |------|-----|-----| | 定时消息 | 固定18个等级 | 任意精度毫秒级 | | 负载均衡 | 队列粒度 | 消息粒度解决长尾 | | 协议支持 | Remoting | Remoting gRPC | | 架构 | Broker单层 | Broker Proxy分层 | | 多语言支持 | 需自行实现 | 官方多语言SDK | | 云原生 | 需改造 | 原生支持K8s |选择建议新项目 → 直接上5.x享受新特性老项目 → 4.x稳定够用无需急于升级云原生场景 → 5.x gRPC Proxy八、性能调优榨干RocketMQ的每一分性能8.1 生产者端调优DefaultMQProducer producer new DefaultMQProducer(producer-group);// 调优参数一发送队列大小 producer.setMaxMessageSize(4 * 1024 * 1024); // 4MB// 调优参数二发送超时时间 producer.setSendMsgTimeout(10000); // 10秒// 调优参数三压缩消息超过4KB自动压缩 producer.setCompressMsgBodyOverHowmuch(4096);// 调优参数四批量发送 ListMessage messages new ArrayList(); for (int i 0; i 100; i) { messages.add(new Message(batch-topic, (msg- i).getBytes())); } producer.send(messages); // 批量发送减少网络开销8.2 消费者端调优RocketMQMessageListener( topic order-topic, consumerGroup order-group, consumeThreadMin 20, // 最小消费线程 consumeThreadMax 64, // 最大消费线程 consumeMessageBatchMaxSize 32 // 批量消费消息数 ) public class OrderConsumer implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt message) {// 业务逻辑优化// 1. 减少数据库查询使用缓存// 2. 异步化非关键逻辑// 3. 合并批量操作}}8.3 Broker端调优1. 内存配置根据机器配置调整-Xms8g -Xmx8g -Xmn4g2. 刷盘策略性能vs可靠性权衡flushDiskTypeASYNC_FLUSH # 异步刷盘高性能flushDiskTypeSYNC_FLUSH # 同步刷盘高可靠3. 消息存储配置CommitLog文件大小mapedFileSizeCommitLog1073741824 # 1GBConsumeQueue文件大小mapedFileSizeConsumeQueue60000004. 删除文件时机fileReservedTime48 # 消息保留48小时5. 磁盘使用率阈值diskMaxUsedSpaceRatio75 # 超过75%拒绝写入九、典型应用场景实战案例场景一电商秒杀系统架构设计用户点击秒杀 ↓ 前端限流令牌桶 ↓ 网关层限流 ↓ 秒杀服务预扣库存 ↓ 发送MQ消息 ↓ 异步处理创建订单、扣库存、发短信核心代码RestController public class SeckillController {Resourceprivate RocketMQTemplate rocketMQTemplate;​Resourceprivate RedisTemplateString, String redisTemplate;​PostMapping(/seckill)public Result seckill(RequestParam String productId, RequestParam String userId) {​// 1. 预扣Redis库存原子操作String stockKey stock: productId;Long stock redisTemplate.opsForValue().decrement(stockKey);​if (stock 0) {// 库存不足恢复redisTemplate.opsForValue().increment(stockKey);return Result.fail(库存不足);}​// 2. 发送秒杀消息到MQSeckillMessage msg new SeckillMessage(productId, userId, System.currentTimeMillis());rocketMQTemplate.syncSend(seckill-topic, msg);​return Result.success(秒杀成功正在处理订单);}}// 消费者异步创建订单 Component RocketMQMessageListener(topic seckill-topic, consumerGroup seckill-group) public class SeckillConsumer implements RocketMQListenerSeckillMessage {Overridepublic void onMessage(SeckillMessage msg) {// 1. 创建订单Order order createOrder(msg);​// 2. 扣减数据库库存inventoryService.deduct(msg.getProductId(), 1);​// 3. 发送短信通知smsService.send(msg.getUserId(), 秒杀成功订单号 order.getId());}}场景二分布式事务 - 下单扣库存业务流程订单服务创建订单本地事务发送Half消息Broker存储Half消息订单服务提交事务状态Broker将消息对消费者可见库存服务扣减库存完整代码Service public class OrderTransactionService {Resourceprivate TransactionMQProducer transactionProducer;​Resourceprivate OrderMapper orderMapper;​// 创建订单并发送事务消息public void createOrderWithTransaction(Order order) {Message msg new Message(inventory-topic,JSON.toJSONString(order).getBytes());​// 发送事务消息transactionProducer.sendMessageInTransaction(msg, order);}​// 执行本地事务Transactionalpublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {Order order (Order) arg;​try {// 本地事务保存订单orderMapper.insert(order);​// 本地事务记录事务日志transactionLogMapper.insert(new TransactionLog(order.getId(),ORDER_CREATED,LocalTransactionState.COMMIT_MESSAGE.name()));​return LocalTransactionState.COMMIT_MESSAGE;​} catch (Exception e) {log.error(订单创建失败, e);return LocalTransactionState.ROLLBACK_MESSAGE;}}​// 事务状态回查public LocalTransactionState checkLocalTransaction(MessageExt msg) {String orderId JSON.parseObject(msg.getBody()).getString(orderId);​// 查询事务日志表TransactionLog log transactionLogMapper.selectByOrderId(orderId);​if (log ! null COMMIT_MESSAGE.equals(log.getStatus())) {return LocalTransactionState.COMMIT_MESSAGE;} else {return LocalTransactionState.ROLLBACK_MESSAGE;}}}// 库存服务消费者 Component RocketMQMessageListener(topic inventory-topic, consumerGroup inventory-group) public class InventoryConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {Order order JSON.parseObject(message, Order.class);​// 扣减库存幂等处理inventoryService.deductWithIdempotent(order.getProductId(), order.getQuantity());}}场景三日志收集系统架构设计应用服务器多台 ↓ 发送日志 RocketMQ高吞吐 ↓ 消费 日志处理服务 ↓ 存储 ElasticSearch / HBase代码实现// 日志采集异步发送不影响主业务 Aspect Component public class LogAspect {Resourceprivate RocketMQTemplate rocketMQTemplate;​Around(annotation(com.example.annotation.LogRecord))public Object around(ProceedingJoinPoint point) throws Throwable {long startTime System.currentTimeMillis();​// 执行业务方法Object result point.proceed();​long endTime System.currentTimeMillis();​// 异步发送日志不阻塞主流程LogMessage log new LogMessage(point.getSignature().getName(),startTime,endTime - startTime,JSON.toJSONString(point.getArgs()),JSON.toJSONString(result));​rocketMQTemplate.asyncSend(log-topic, log, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {// 发送成功不做处理}​Overridepublic void onException(Throwable e) {// 发送失败记录到本地文件降级方案log.error(日志发送失败, e);}});​return result;}}十、总结RocketMQ核心要点速查核心概念速记┌────────────┬──────────────┬───────────────────┐ │ 概念 │ 核心作用 │ 关键点 │ ├────────────┼──────────────┼───────────────────┤ │ NameServer │ 路由注册中心 │ 去中心化、无状态 │ ├────────────┼──────────────┼───────────────────┤ │ Broker │ 消息存储 │ 主从架构、顺序写 │ ├────────────┼──────────────┼───────────────────┤ │ Topic │ 消息主题 │ 逻辑分组 │ ├────────────┼──────────────┼───────────────────┤ │ Queue │ 物理队列 │ 并发消费的基础 │ ├────────────┼──────────────┼───────────────────┤ │ Producer │ 生产者 │ 三种发送方式 │ ├────────────┼──────────────┼───────────────────┤ │ Consumer │ 消费者 │ 集群/广播两种模式 │ └────────────┴──────────────┴───────────────────┘高可靠配置三板斧第一板斧生产者重试retries3第二板斧Broker同步刷盘同步复制flushDiskTypeSYNC_FLUSH brokerRoleSYNC_MASTER第三板斧消费者手动ACKackModeMANUAL性能优化三要素批量操作批量发送、批量消费异步处理异步发送、异步刷盘合理配置线程数、队列数、消息大小常见问题应对策略┌────────────┬──────────────────────────────────────┐ │ 问题 │ 解决方案 │ ├────────────┼──────────────────────────────────────┤ │ 消息丢失 │ 同步刷盘 同步复制 手动ACK │ ├────────────┼──────────────────────────────────────┤ │ 消息重复 │ 业务幂等数据库唯一索引/Redis去重 │ ├────────────┼──────────────────────────────────────┤ │ 消息乱序 │ 相同MessageGroup 顺序消费 │ ├────────────┼──────────────────────────────────────┤ │ 消息堆积 │ 增加消费者 优化消费逻辑 │ ├────────────┼──────────────────────────────────────┤ │ 分布式事务 │ 事务消息 本地事务表 │ └────────────┴──────────────────────────────────────┘写在最后RocketMQ作为阿里巴巴在分布式消息领域的扛鼎之作经历了无数次双十一的考验已经成为Java生态中最可靠的消息中间件之一。从最初的异步解耦、削峰填谷到顺序消息、事务消息、定时消息RocketMQ不断进化为开发者提供了丰富的武器库。核心理念记住三点可靠性优先在金融、电商等核心业务中宁可牺牲一点性能也要保证消息不丢幂等是基石消息重复无法100%避免业务幂等才是王道监控要到位消息堆积、消费延迟、发送成功率必须实时监控掌握了RocketMQ你就掌握了构建高可用分布式系统的核心技能。

相关新闻