SpringBoot整合RocketMQ极速实战

发布时间:2026/6/5 21:08:02

SpringBoot整合RocketMQ极速实战 一、前期准备1.1 环境依赖SpringBoot 2.x / 3.x本文代码全兼容RocketMQ 服务端4.x/5.x/7.x 均可maven/gradle 项目1.2 核心依赖引入MavenApache 官方适配 SpringBoot Starter无需手动适配版本自动兼容。!-- SpringBoot 整合 RocketMQ 官方依赖 -- dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-spring-boot-starter/artifactId version2.2.3/version /dependency二、全局配置文件application.yml配置服务地址、生产组、消费组后续所有功能统一复用该配置。# RocketMQ 配置 rocketmq: # NameServer 集群地址 name-server: 127.0.0.1:9876 # 生产者配置 producer: # 生产者组名 group: demo-producer-group # 消息发送超时时间 send-message-timeout: 3000 # 最大重试次数 retry-times-when-send-failed: 2 # 消费者默认配置 consumer: group: demo-consumer-group三、SpringBoot 集成 RocketMQ 核心原理在进行各类消息实战之前先深度讲解 SpringBoot 与 RocketMQ 的底层集成原理搞懂自动装配、核心组件、通信机制知其然更知其所以然解决开发中组件失效、连接失败、消息发送异常等底层问题。3.1 自动装配原理核心RocketMQ 官方提供的rocketmq-spring-boot-starter遵循 SpringBoot 自动装配机制无需手动创建生产者、消费者实例框架自动初始化托管Bean。配置加载项目启动时自动读取application.yml中 rocketmq 全局配置包含NameServer地址、生产/消费组、超时时间、AK/SK权限信息等。Bean自动注册starter 内置自动配置类自动初始化RocketMQTemplate模板类交由Spring容器管理开发者可直接Autowired注入使用。消费者动态注册被RocketMQMessageListener注解的消费者类项目启动时会被Spring扫描自动根据注解参数创建消费者实例、订阅对应Topic、绑定消费组与消费模式。资源自动销毁项目关闭时Spring容器自动销毁生产、消费实例优雅关闭连接避免消息堆积、连接残留问题。3.2 核心集成组件说明RocketMQTemplateSpringBoot整合的核心操作模板封装了原生API的所有发送能力包含同步、异步、单向、有序、批量、事务消息发送是生产者唯一操作入口屏蔽原生底层复杂API。RocketMQMessageListener消费者核心注解承载所有消费配置可配置Topic、消费组、集群/广播模式、消息过滤表达式、并发消费数等参数实现零配置快速订阅。RocketMQLocalTransactionListener事务消息专属监听接口框架自动注册事务监听器实现本地事务执行、事务状态回查两大核心能力。3.3 完整通信执行流程启动初始化SpringBoot项目启动自动装配机制加载RocketMQ配置初始化RocketMQTemplate、消费者实例。路由拉取生产者、消费者自动连接NameServer定时拉取Topic对应的Broker路由信息缓存至本地。消息投递业务调用RocketMQTemplate方法发送消息框架基于负载均衡选择最优Broker节点投递。消息存储Broker接收消息、持久化落地磁盘返回投递结果给生产者。消息消费消费者主动拉取对应Topic消息执行业务逻辑框架自动维护消费位点、失败重试机制。3.4 SpringBoot集成优势极简开发摒弃原生繁琐的代码创建实例、配置绑定注解配置文件即可快速开发。容器托管所有MQ组件交由Spring容器统一管理生命周期可控、优雅启停。能力全覆盖封装所有高级消息特性适配普通、延迟、顺序、批量、事务等全场景。高容错性框架内置发送重试、异常捕获、位点维护、自动重连机制大幅降低开发容错成本。四、基础消息实战普通生产与消费三、基础消息实战普通生产与消费3.1 普通消息实现原理原理说明普通消息是RocketMQ最基础的消息模型采用生产者主动推送、消费者主动拉取模式。生产者通过NameServer获取Broker路由信息基于负载均衡策略选择Broker节点投递消息消息落地Broker磁盘持久化消费者定时拉取消息业务执行成功自动提交消费位点异常触发重试保障At-Least-Once至少一次投递语义。支持同步、异步、单向三种发送模式适配不同吞吐与可靠性需求。3.2 普通消息生产者支持三种发送模式同步、异步、单向覆盖绝大多数业务场景。import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; Component public class MqProducer { Autowired private RocketMQTemplate rocketMqTemplate; // 定义全局Topic private static final String TOPIC demo-normal-topic; /** * 同步发送消息可靠适合核心业务 */ public void sendSyncMsg(String msg) { SendResult sendResult rocketMqTemplate.syncSend(TOPIC, msg); System.out.println(同步发送结果 sendResult.getSendStatus()); } /** * 异步发送消息高吞吐适合非核心业务 */ public void sendAsyncMsg(String msg) { rocketMqTemplate.asyncSend(TOPIC, msg, new SendCallback() { Override public void onSuccess(SendResult sendResult) { System.out.println(异步发送成功); } Override public void onException(Throwable e) { System.err.println(异步发送失败 e.getMessage()); } }); } /** * 单向发送极致高性能无需响应 */ public void sendOneWayMsg(String msg) { rocketMqTemplate.sendOneWay(TOPIC, msg); } }3.3 普通消息消费者默认集群消费模式自动负载均衡异常自动重试。import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; Component RocketMQMessageListener( topic demo-normal-topic, consumerGroup demo-consumer-group ) public class NormalMsgConsumer implements RocketMQListenerString { Override public void onMessage(String message) { // 执行业务逻辑 System.out.println(收到普通消息 message); // 异常自动进入重试队列无需手动处理 // int a 1 / 0; } }五、广播消息实战实现原理广播消息核心是消费组级别的全量投递。集群消费是组内分摊消息而广播消费会让Broker将同一条消息推送给当前消费组内所有在线消费者实例每个实例独立消费、独立维护位点。为避免集群异常刷屏广播消费关闭重试机制消息消费失败不会进入重试队列。适用于全服务缓存刷新、全局配置更新等全员同步场景。import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; Component RocketMQMessageListener( topic demo-broadcast-topic, consumerGroup demo-broadcast-group, consumeMode ConsumeMode.BROADCASTING // 开启广播消费 ) public class BroadcastConsumer implements RocketMQListenerString { Override public void onMessage(String message) { System.out.println(广播消费消息 message); } }注意广播消费不支持重试适合全局缓存刷新、配置更新场景。六、顺序消息实战分区有序实现原理RocketMQ顺序消息仅支持分区有序局部有序不支持全局有序。核心实现逻辑Topic会被拆分为多个消息队列生产者通过自定义业务Key哈希取模将同一业务维度同一订单/同一用户的消息固定投递到同一个MessageQueue消费者单线程消费单个队列严格保证队列内消息FIFO先进先出从而实现业务时序一致。不同队列消息并行消费兼顾有序性与吞吐量。5.1 有序消息生产者原理与代码public void sendOrderMsg(String bizKey, String msg) { // bizKey相同 → 固定发送同一队列 → 保证顺序 rocketMqTemplate.syncSendOrderly(demo-order-topic, msg, bizKey); }5.2 有序消息消费者原理与代码import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; Component RocketMQMessageListener( topic demo-order-topic, consumerGroup demo-order-group, messageModel MessageModel.CLUSTERING ) public class OrderMsgConsumer implements RocketMQListenerString { Override public void onMessage(String message) { System.out.println(顺序消费 message); } }七、延迟消息实战实现原理延迟消息核心是系统队列中转延时机制。生产者发送消息时指定延迟等级Broker接收消息后不会存入目标Topic队列而是转入内置的SCHEDULE_TOPIC_XXXX延迟队列。Broker后台定时线程扫描延迟队列倒计时结束后将消息重新路由至用户指定的普通Topic队列消费者方可正常拉取消费。RocketMQ不支持自定义任意时间仅支持官方预设18个固定延迟等级保证服务性能稳定。/** * 发送延迟消息 * 延迟等级1~18级 → 1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h */ public void sendDelayMsg(String msg) { // 3级延迟 10秒后消费 rocketMqTemplate.syncSend(demo-delay-topic, msg, 3); }八、批量消息实战实现原理批量消息核心是合并网络IO、减少请求次数。多条同Topic、同配置的消息封装为一个消息列表通过一次网络请求提交至BrokerBroker批量持久化存储。大幅降低频繁网络连接、请求握手带来的性能开销极大提升高吞吐场景的并发能力。批量消息为整体事务机制整批消息要么全部成功存储要么全部失败不支持部分成功。import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; public void sendBatchMsg() { ListMessage messageList new ArrayList(); for (int i 0; i 10; i) { Message message new Message(demo-batch-topic, (批量消息 i).getBytes()); messageList.add(message); } // 批量发送 rocketMqTemplate.syncSend(messageList); }注意单批次总大小不超过4MB整体成功/整体失败。九、消息过滤实战Tag过滤实现原理消息过滤采用服务端预过滤机制避免无效消息拉取浪费网络资源。Tag过滤是轻量级过滤方案生产者为消息绑定业务标签Broker存储时关联Tag标识消费者订阅指定Tag表达式Broker在服务端直接匹配过滤仅推送符合条件的消息至消费者无性能损耗。复杂场景可使用SQL过滤基于消息自定义属性做多条件筛选。8.1 Tag过滤实现原理与代码// 发送订单消息tag ORDER public void sendTagMsg() { rocketMqTemplate.syncSend(demo-tag-topic:ORDER, 订单创建消息); }8.2 消费者订阅指定TagRocketMQMessageListener( topic demo-tag-topic, consumerGroup demo-tag-group, selectorExpression ORDER||PAY // 只消费ORDER、PAY标签消息 ) Component public class TagFilterConsumer implements RocketMQListenerString{ Override public void onMessage(String s) { System.out.println(过滤消费消息 s); } }十、事务消息实战核心重点实现原理事务消息基于两阶段提交超时回查机制解决本地数据库事务与消息发送的分布式一致性问题。第一阶段发送半消息预提交消息持久化但对消费者不可见第二阶段执行本地事务根据事务结果提交或回滚消息。针对生产者宕机、网络超时等异常Broker提供定时回查兜底主动校验本地事务状态彻底杜绝消息悬挂、数据不一致问题。实现本地事务与消息发送最终一致性。9.1 事务消息完整代码实现import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; Component RocketMQTransactionListener public class TransactionMsgListener implements RocketMQLocalTransactionListener { Autowired private RocketMQTemplate rocketMqTemplate; private static final String TRANS_TOPIC demo-trans-topic; // 发送事务消息入口 public void sendTransMsg(String content) { MessageString message MessageBuilder.withPayload(content).build(); rocketMqTemplate.sendMessageInTransaction(TRANS_TOPIC, message, content); } // 执行本地事务 Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { try { // 模拟本地数据库事务 System.out.println(执行本地事务 arg); // 成功则提交消息 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { // 异常回滚消息 return RocketMQLocalTransactionState.ROLLBACK; } } // 事务回查 Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { // 查询数据库事务状态此处模拟成功 return RocketMQLocalTransactionState.COMMIT; } }

相关新闻