Redis Stream 与消息队列模式:从 Pub/Sub 到持久化消费

发布时间:2026/6/11 23:06:59

Redis Stream 与消息队列模式:从 Pub/Sub 到持久化消费 Redis Stream 与消息队列模式从 Pub/Sub 到持久化消费一、Redis Pub/Sub 的不可靠困境消息丢失与无法回溯Redis 的 Pub/Sub 机制轻量高效但存在两个致命缺陷一是消息不持久化如果消费者离线期间发布的消息将永久丢失二是无法回溯消费消息一旦发布就被推送给在线消费者没有历史记录可供查询。在需要可靠消息传递的场景如订单状态变更通知、异步任务分发中Pub/Sub 的不可靠性成为硬伤。Redis 5.0 引入的 Stream 数据类型提供了类似 Kafka 的持久化消息队列能力消息持久存储、支持消费者组、支持消息确认和重试、支持历史消息回溯。Stream 是 Redis 从缓存走向消息基础设施的关键一步。二、Stream 的消费者组模型Stream 的消费者组Consumer Group模型借鉴了 Kafka 的设计理念多个消费者组成一个组组内消费者共享消息每条消息只被组内一个消费者处理不同组独立消费每条消息会被每个组各消费一次。flowchart TD A[Stream消息流] -- B[消费者组 A] A -- C[消费者组 B] B -- D[消费者 A1处理消息 1,4,7] B -- E[消费者 A2处理消息 2,5,8] B -- F[消费者 A3处理消息 3,6,9] C -- G[消费者 B1处理消息 1,2,3] C -- H[消费者 B2处理消息 4,5,6] subgraph 消息确认机制 I[XACK确认消费] J[XPEND待确认列表] K[XPCLAIM转移超时消息] end D -- I E -- I J -- K关键机制消费者读取消息后消息进入 Pending 列表待确认状态。消费者处理完成后发送 XACK 确认。如果消费者宕机Pending 中的消息不会被确认其他消费者可以通过 XPCLAIM 接管超时消息实现故障转移。三、工程化实现3.1 消息生产者// StreamProducer.java Component RequiredArgsConstructor public class StreamProducer { private final StringRedisTemplate redisTemplate; // 发送消息到 Stream public RecordId sendMessage(String streamKey, MapString, String message) { // 使用 MAXLEN 限制 Stream 长度防止内存无限增长 // 近似裁剪~避免性能影响 StringRecord record StreamRecords.string(message) .withStreamKey(streamKey); RecordId recordId redisTemplate.opsForStream().add( record, XAddOptions.maxlen(100000).approximateTrimming() ); if (recordId null) { throw new RuntimeException(消息发送失败); } return recordId; } // 发送带业务 ID 的消息支持幂等去重 public RecordId sendMessageWithId( String streamKey, String messageId, MapString, String message ) { message.put(_biz_id, messageId); // 检查是否已发送基于业务 ID 去重 // 简化实现实际应使用 Redis Hash 维护去重表 return sendMessage(streamKey, message); } }3.2 消费者组与消息消费// StreamConsumer.java Component RequiredArgsConstructor Slf4j public class StreamConsumer { private final StringRedisTemplate redisTemplate; private final OrderEventHandler orderEventHandler; private static final String STREAM_KEY order:events; private static final String GROUP_NAME order-processor; private static final String CONSUMER_NAME consumer-1; PostConstruct public void init() { // 创建消费者组如果不存在 try { redisTemplate.opsForStream().createGroup( STREAM_KEY, GROUP_NAME ); } catch (Exception e) { // 组已存在忽略 log.info(消费者组已存在{}, GROUP_NAME); } } // 拉取模式消费消息 Scheduled(fixedDelay 100) // 每 100ms 拉取一次 public void consumeMessages() { // 从消费者组读取未消费的消息 ListMapRecordString, Object, Object records redisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, CONSUMER_NAME), StreamReadOptions.empty().count(10), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()) ); if (records null || records.isEmpty()) { return; } for (MapRecordString, Object, Object record : records) { try { // 处理消息 handleMessage(record); // 确认消费 redisTemplate.opsForStream().acknowledge( STREAM_KEY, GROUP_NAME, record.getId() ); } catch (Exception e) { // 处理失败消息留在 Pending 列表等待重试 log.error(消息处理失败{}, 错误{}, record.getId(), e.getMessage()); } } } private void handleMessage(MapRecordString, Object, Object record) { MapObject, Object body record.getValue(); String eventType (String) body.get(eventType); switch (eventType) { case ORDER_CREATED - orderEventHandler.handleCreated(body); case ORDER_PAID - orderEventHandler.handlePaid(body); case ORDER_CANCELLED - orderEventHandler.handleCancelled(body); default - log.warn(未知事件类型{}, eventType); } } }3.3 Pending 消息监控与故障转移// PendingMessageMonitor.java Component RequiredArgsConstructor Slf4j public class PendingMessageMonitor { private final StringRedisTemplate redisTemplate; private static final String STREAM_KEY order:events; private static final String GROUP_NAME order-processor; private static final Duration MESSAGE_TIMEOUT Duration.ofMinutes(5); // 定期检查 Pending 消息转移超时消息 Scheduled(fixedDelay 60000) // 每分钟检查一次 public void monitorPendingMessages() { PendingMessagesSummary summary redisTemplate.opsForStream() .pending(STREAM_KEY, GROUP_NAME); if (summary null || summary.getTotalPendingMessages() 0) { return; } log.info(Pending 消息数{}, summary.getTotalPendingMessages()); // 获取超时的 Pending 消息 PendingMessages pending redisTemplate.opsForStream().pending( STREAM_KEY, Consumer.from(GROUP_NAME, monitor), Range.unbounded(), 100 ); if (pending null) return; for (PendingMessage pm : pending) { // 消息超过 5 分钟未确认视为超时 if (pm.getElapsedTimeSinceLastDelivery().compareTo( MESSAGE_TIMEOUT) 0) { // 将超时消息转移给当前消费者重新处理 redisTemplate.opsForStream().claim( STREAM_KEY, GROUP_NAME, monitor, Duration.ZERO, pm.getId() ); log.warn(转移超时消息{}, 原消费者{}, pm.getId(), pm.getConsumerName()); } } } }四、Redis Stream 的边界与权衡内存容量限制Stream 数据存储在 Redis 内存中消息量受内存限制。虽然 MAXLEN 可以控制 Stream 长度但裁剪后的消息无法恢复。对于需要长期保存的消息如审计日志必须将消息归档到外部存储如 MySQL、对象存储。消费延迟与拉取模式Redis Stream 的消费是拉取模式XREADGROUP需要客户端定期轮询。相比 Kafka 的推模式拉取模式增加了消费延迟。优化策略是低延迟场景使用 BLOCK 选项阻塞等待高吞吐场景使用 COUNT 批量拉取。消费者再平衡的缺失Kafka 有完善的消费者再平衡机制消费者加入/离开时自动重新分配分区Redis Stream 没有内置的再平衡。消费者宕机后Pending 消息需要手动通过 XPCLAIM 转移。建议实现自动化的 Pending 监控和转移机制。与专业消息队列的差距Redis Stream 适合轻量级消息场景但在消息可靠性、分区扩展、消费者再平衡等方面与 Kafka/RabbitMQ 有显著差距。如果业务对消息可靠性要求极高如金融交易应使用专业消息队列而非 Redis Stream。五、总结Redis Stream 为轻量级消息队列场景提供了开箱即用的解决方案在不需要引入 Kafka 的复杂度时是一个务实的选择。落地路线上建议先用 Stream 替代 Pub/Sub 实现可靠消息传递再逐步引入消费者组和 Pending 监控。关键原则Stream 是 Redis 的功能而非专业消息队列适合轻量场景重度消息场景仍需 Kafka/RabbitMQ。

相关新闻