SpringBoot 广播消息实现(发布/订阅)

发布时间:2026/5/27 9:48:17

SpringBoot 广播消息实现(发布/订阅) 在 RabbitMQ 的五大工作模式中发布/订阅Publish/Subscribe广播模式是分布式系统中非常核心的通信方式。我们日常使用的普通点对点队列一条消息只会被一个消费者消费竞争消费而广播模式可以实现一条消息、多服务、多消费者同时接收完美实现一对多通知。像缓存刷新、配置更新、全局通知、多节点日志同步、服务状态广播等场景全部依赖 Fanout 广播模式。一、什么是 MQ 广播发布/订阅模式1. 核心定义广播模式基于FanoutExchange扇形交换机实现核心逻辑生产者发送一条消息到 Fanout 交换机所有绑定该交换机的队列都会完整收到这条消息。不管路由键是什么、不管队列名称只要完成绑定就会无条件广播投递。2. 核心特性• 无视routingKey路由键传空、传任意值都不生效• 纯广播、全量投递、一对多分发• 每条消息独立进入每一个绑定队列• 天然支持多服务、多节点同步通知• 无匹配规则绑定即接收3. 适用业务场景• 分布式缓存全局刷新多节点统一清空缓存• 系统配置动态推送、热更新• 全站公告、全局消息推送• 微服务多节点日志采集、链路追踪• 服务上下线、状态同步广播• 多端消息同步PC/APP/小程序二、四大交换机模式核心对比交换机类型匹配规则消费模式核心场景Direct直连完全匹配 routingKey点对点竞争消费订单、支付、任务处理Topic主题通配符模糊匹配选择性多消费日志分级、消息订阅Fanout广播无视路由键全部投递全员订阅消费缓存刷新、全局通知Headers匹配消息头参数自定义匹配极少使用三、关键认知误区1同一个队列多消费者可以实现广播绝对错误同一个队列下的多个消费者默认是竞争消费一条消息只会被一个消费者消费。广播必备条件每个消费者对应一个独立队列全部绑定同一个 Fanout 交换机。2Fanout 交换机需要配置路由键Fanout 交换机底层逻辑直接忽略 routingKey无论发送时传什么值都不会影响广播效果。3广播消息天然可靠、不会丢失默认非持久化、自动ACK 场景下广播消息极易丢失生产必须做持久化手动ACK。四、SpringBoot 完整实现1. 基础依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency2. 生产级配置文件spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: # 手动ACK 保证广播消息不丢 acknowledge-mode: manual # 限制预取数防止单节点消息堆积 prefetch: 5 # 开启消费重试 retry: enabled: true max-attempts: 3 initial-interval: 10003. 广播交换机、队列、绑定配置类import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; Configuration public class FanoutBroadcastConfig { // 广播交换机名称 public static final String FANOUT_EXCHANGE system.fanout.broadcast.exchange; // 三个独立消费者队列 public static final String QUEUE_CACHE_REFRESH queue.cache.refresh; public static final String QUEUE_NOTICE queue.system.notice; public static final String QUEUE_LOG queue.log.collect; // 声明 Fanout 广播交换机持久化、不自动删除 Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE, true, false); } // 队列1缓存刷新队列 Bean public Queue cacheRefreshQueue() { return new Queue(QUEUE_CACHE_REFRESH, true); } // 队列2系统通知队列 Bean public Queue noticeQueue() { return new Queue(QUEUE_NOTICE, true); } // 队列3日志采集队列 Bean public Queue logQueue() { return new Queue(QUEUE_LOG, true); } // 全部绑定到广播交换机 Bean public Binding bindingCacheRefresh(Queue cacheRefreshQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(cacheRefreshQueue).to(fanoutExchange); } Bean public Binding bindingNotice(Queue noticeQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(noticeQueue).to(fanoutExchange); } Bean public Binding bindingLog(Queue logQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(logQueue).to(fanoutExchange); } }4. 广播消息生产者import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; RestController public class BroadcastProducer { Autowired private RabbitTemplate rabbitTemplate; GetMapping(/send/broadcast) public String sendBroadcastMsg(RequestParam String content) { // Fanout广播路由键传空字符串 rabbitTemplate.convertAndSend( FanoutBroadcastConfig.FANOUT_EXCHANGE, , content ); return ✅ 广播消息发送成功 content; } }5. 多消费者实现全员接收消费者1缓存刷新消费者import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; Component public class CacheRefreshConsumer { RabbitListener(queues FanoutBroadcastConfig.QUEUE_CACHE_REFRESH) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println(【缓存服务】接收广播消息 msg); // 执行缓存刷新业务逻辑 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消费失败重回队列重试 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }消费者2系统通知消费者Component public class SystemNoticeConsumer { RabbitListener(queues FanoutBroadcastConfig.QUEUE_NOTICE) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println(【通知服务】接收广播消息 msg); // 执行消息推送业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }消费者3日志采集消费者Component public class LogCollectConsumer { RabbitListener(queues FanoutBroadcastConfig.QUEUE_LOG) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println(【日志服务】接收广播消息 msg); // 执行日志采集业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }五、测试效果访问接口http://localhost:8080/send/broadcast?content全局缓存刷新通知控制台输出【缓存服务】接收广播消息全局缓存刷新通知 【通知服务】接收广播消息全局缓存刷新通知 【日志服务】接收广播消息全局缓存刷新通知✅ 一条消息多服务同时消费广播生效六、总结1. 必须开启持久化交换机、队列全部设置持久化防止重启丢失广播配置。2. 强制手动ACK广播场景多为重要通知、缓存同步自动ACK会导致业务未执行完成消息丢失。3. 每个服务独立队列不同微服务必须使用独立队列避免竞争消费保证广播全覆盖。4. 广播消息建议做幂等MQ 重试、网络抖动会导致广播消息重复推送核心业务必须基于消息ID做幂等防重。5. 禁止设置复杂路由键Fanout 无视路由键统一传空字符串保持代码规范。写在最后广播发布订阅模式是微服务分布式通信的重要基石区别于传统的点对点任务消费它主打全局通知、多节点同步、状态广播是缓存刷新、配置热更新、系统公告等场景的最优解。很多开发者一直混淆“竞争消费”和“广播消费”的本质导致线上通知不全、同步失效等隐性问题。吃透 Fanout 交换机的底层原理与落地规范能帮你彻底解决分布式多节点同步难题。持续更新 SpringBoot、微服务、MQ 中间件、架构实战、面试刷题干货帮你夯实技术底盘轻松搞定工作与面试。觉得文章有用点赞、收藏、转发一波持续关注解锁更多生产级技术干货

相关新闻