WebSocket + Netty 构建一个简易的聊天软件

发布时间:2026/6/29 0:14:19

WebSocket + Netty 构建一个简易的聊天软件 前言即时通讯IM是移动互联网时代的基础设施。从微信到钉钉从直播弹幕到游戏同步IM 早已无处不在。如何构建一个高性能、高可用的聊天系统是每一位后端开发进阶之路上的必修课。本文将以一个实际项目为例讲解如何基于Netty WebSocket Spring Boot构建完整的聊天系统涵盖实时消息、文件传输、离线消息的三层可靠性保障、消息幂等去重等实战话题。项目已在生产环境验证可直接作为 IM 系统的脚手架参考。一、整体架构系统的核心模块是信令服务Signaling Service——它通过 Netty 管理所有 WebSocket 长连接同时提供 REST API 用于历史消息查询和文件上传。┌──────────────┐ HTTP/REST ┌──────────────┐ Dubbo ┌──────────────┐ │ uni-app │ ─────────────── │ API 网关 │ ────────── │ 用户服务 │ │ (前端) │ │ (Gateway) │ │ (好友/登录) │ └──────┬───────┘ └──────────────┘ └──────────────┘ │ WebSocket ws:// ▼ ┌──────────────────────────────────────────────────────────────────────┐ │ 信令服务 (Signaling Service) │ │ │ │ ┌──────────────────────┐ ┌────────────────────────────────┐ │ │ │ Netty WebSocket │ │ REST Controller │ │ │ │ Server :9090 │ │ /api/chat/history │ │ │ │ /ws?tokenjwt │ │ /api/chat/upload │ │ │ └──────────┬───────────┘ │ /api/monitor/offline/* │ │ │ │ └───────────────┬────────────────┘ │ │ ▼ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ 核心服务层 │ │ │ │ SessionManager → userId ↔ Channel 双向映射 │ │ │ │ ChatService → 消息持久化 三层离线保障 │ │ │ │ CallRecordService→ 通话记录管理 │ │ │ │ FileService → MinIO 对象存储 │ │ │ └──────────────────────────┬───────────────────────────┘ │ │ │ │ │ ┌──────────────┐ ┌──────▼───────┐ ┌──────────────────┐ │ │ │ PostgreSQL │ │ Redis │ │ MinIO (S3) │ │ │ │ 消息持久化 │ │ 在线状态 │ │ 图片/文件存储 │ │ │ │ 离线消息 DB │ │ 离线缓存 │ │ 预签名 URL │ │ │ │ 用户/好友 │ │ Pub/Sub │ │ │ │ │ └──────────────┘ └──────────────┘ └──────────────────┘ │ └──────────────────────────────────────────────────────────────────────┘为什么选择 Netty对比NettyTomcat WebSocket (JSR 356)线程模型全异步 Reactor一个 EventLoop 管数千连接每个连接一个线程高并发下线程爆炸内存管理池化 DirectByteBuf零拷贝堆内存分配GC 压力大扩展性Pipeline 可编排任意处理器注解驱动灵活性有限简单说Netty 能用更少的线程支撑更多的连接。对于聊天这种长连接密集场景这是决定性的优势。二、Netty 服务器搭建2.1 ServerBootstrap 配置Component Slf4j public class NettyWebSocketServer { ​ private EventLoopGroup bossGroup new NioEventLoopGroup(2); private EventLoopGroup workerGroup new NioEventLoopGroup(3); private Channel channel; ​ PostConstruct public void start() { ServerBootstrap bootstrap new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast( new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerCompressionHandler(), new SignalingHandler(), // 核心业务处理器 new WebSocketServerProtocolHandler(/ws), new IdleStateHandler(60, 0, 0) // 60s 空闲检测 ); } }); channel bootstrap.bind(9090).channel(); } }Pipeline 各组件职责处理器作用HttpServerCodecHTTP 编解码WebSocket 握手依赖 HTTP 升级HttpObjectAggregator将 HTTP 请求聚合成完整消息WebSocketServerCompressionHandler数据帧压缩SignalingHandler自定义处理器处理所有业务消息WebSocketServerProtocolHandler协议升级、帧聚合、关闭控制IdleStateHandler60 秒无读则触发空闲事件清理死连接2.2 JWT 握手鉴权WebSocket 标准 API 不支持自定义 Header因此 Token 通过 URL 查询参数传递private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { QueryStringDecoder decoder new QueryStringDecoder(req.uri()); String token decoder.parameters().get(token).get(0); Claims claims JwtUtil.validateToken(token); Long userId claims.get(userId, Long.class); ​ // 注册会话 sessionManager.register(userId, ctx.channel()); // Redis 标记在线60s TTL分布式环境自动过期 redisTemplate.opsForValue().set(user:status: userId, online, 60, TimeUnit.SECONDS); // 广播好友上线 redisTemplate.convertAndSend(channel:friend_status, statusJson); // 推送离线消息 chatService.pushOfflineMessages(userId); // 重写 URI 交给 WebSocketServerProtocolHandler req.setUri(/ws); ctx.fireChannelRead(req.retain()); }三、会话管理SessionManager每个连接对应一个 NettyChannel使用ConcurrentHashMap维护双向映射。Component public class SessionManager { private final ConcurrentHashMapLong, Channel userChannelMap new ConcurrentHashMap(); private final ConcurrentHashMapString, Long channelUserMap new ConcurrentHashMap(); ​ /** 注册会话同一用户新连接会踢掉旧连接 */ public void register(Long userId, Channel channel) { Channel old userChannelMap.put(userId, channel); if (old ! null old.isActive()) { old.writeAndFlush(new TextWebSocketFrame({\type\:\kicked\})); old.close(); } channelUserMap.put(channel.id().toString(), userId); } ​ public void sendMessage(Long userId, String message) { Channel ch userChannelMap.get(userId); if (ch ! null ch.isActive()) { ch.writeAndFlush(new TextWebSocketFrame(message)); } } }设计要点同一个用户在多设备登录时旧连接会被主动踢下线channelUserMap反向索引解决channelInactive事件中如何知道哪个用户断开的问题不直接暴露 Channel 对象避免业务代码误操作四、消息协议设计所有 WebSocket 消息使用统一 JSON 格式type字段路由Data public class SignalingMessage { private String type; // chat_message | call | sdp | ice | ... private Long fromUserId; private Long toUserId; private String content; // 聊天文本 private String messageType; // text | image | file | audio private String fileUrl; private String fileName; private Long fileSize; private Long msgId; // 服务端分配的 Snowflake ID private Long clientMsgId; // 客户端临时 IDACK 匹配 private String idempotentKey; // 幂等键客户端去重用 // ... 信令字段sdp, candidate, roomId 等 }核心消息类型类型流向说明chat_messageC ↔ S聊天消息chat_ackS → C送达确认offline_messagesS → C离线消息批量推送mark_readC → S已读回执heartbeat / heartbeat_ackC ↔ S心跳保活五、聊天消息核心流程5.1 消息发送到送达发送方 信令服务 接收方 │ │ │ │── chat_message ───────│ │ │ {toUserId,content} │ │ │ │── 1. 持久化到 PostgreSQL │ │ │ │ │── chat_ack ──────────│ │ │ {msgId,clientMsgId} │ │ │ │── 2. 在线检查 │ │ │ ├── 在线 → WebSocket 转发│ │ │ └── 离线 → 离线消息存储 │ │ │ ↓ │ │ │ 写入DBRedis(双写) │ │ │ 标记status0(待推送) │private void handleChatMessage(SignalingMessage msg) { // 1. 持久化消息Snowflake ID ChatMessage saved chatService.saveMessage(fromUserId, toUserId, ...); Long msgId saved.getId(); ​ // 2. 回 ACK 给发送方 sendAck(fromUserId, msgId, msg.getClientMsgId()); ​ // 3. 构建转发消息 String forwardJson buildForwardMessage(saved); ​ if (sessionManager.isOnline(toUserId)) { // 在线直接推送 sessionManager.sendMessage(toUserId, forwardJson); } else { // 离线幂等双写 chatService.storeOfflineMessage(toUserId, forwardJson, msgId); } }5.2 ACK 确认机制每条chat_message必须回chat_ack否则发送方 UI 会停留在发送中状态前端发送时生成clientMsgId前端生成如 UUID服务端回 ACK 时携带msgId clientMsgId前端匹配到clientMsgId后本地消息从发送中→已发送10 秒无 ACK → 前端超时重试六、离线消息的三层可靠性保障离线消息必须在 Redis 崩了、服务重启、网络闪断等各种故障下100% 不丢。整体设计存储 读取 ┌─────────────────┐ ┌─────────────────┐ │ PostgreSQL │ │ Redis 优先 │ │ status0/1/2 │──────│ (高吞吐) │ │ (持久化保底) │ │ │ └─────────────────┘ └─────────────────┘ │ │ │ DB 失败 │ Redis 挂了 │ 业务不中断 │ 自动降级 DB ▼ ▼ 消息不丢 ✓ 消息不丢 ✓第一层实时推送用户上线时public void pushOfflineMessages(Long userId) { // 1. 优先从 Redis 拉取高性能 ListString messages popFromRedis(userId); // 2. DB 补充拉取 status0 的未推送消息去重后合并 ListOfflineMessage dbRecords offlineMessageRepository .findByUserIdAndStatusOrderByCreateTimeAsc(userId, STATUS_UNPUSHED); // 3. WebSocket 推送 boolean ok doPush(userId, messages); // 4. 推送成功 → 标记 DB status1Redis 已 leftPop 删除 if (ok) markAllPushed(dbRecords); }第二层定时补偿每 5 分钟即使第一层因网络瞬断等原因推送失败定时任务会兜底Component RequiredArgsConstructor public class OfflineMessageCompensationTask { /** 每 5 分钟扫描未推送的离线消息重新投递 */ Scheduled(initialDelay 30_000, fixedRate 300_000) public void compensateUndelivered() { chatService.resendUndeliveredMessages(); } /** 每日凌晨 3 点清理已推送的过期记录 */ Scheduled(cron 0 0 3 * * ?) public void cleanExpired() { chatService.cleanExpiredMessages(); } }public void resendUndeliveredMessages() { // 扫描 status0 的消息最多 200 条 ListOfflineMessage pending offlineMessageRepository .findByStatusOrderByCreateTimeAsc(STATUS_UNPUSHED, PageRequest.of(0, 200)); for (OfflineMessage msg : pending) { if (!sessionManager.isOnline(msg.getUserId())) continue; boolean ok doPush(msg.getUserId(), msg.getContent()); if (ok) { offlineMessageRepository.markAsPushed(msg.getId(), now); // → status1 } else { offlineMessageRepository.markAsFailed(msg.getId(), now); // → status2, pushCount } } }第三层监控告警通过 REST API 暴露指标端点说明GET /api/monitor/offline/stats全局未推送数、失败数、积压超 5 分钟数GET /api/monitor/offline/backlog/{userId}指定用户积压GET /api/monitor/offline/reconcile/{userId}Redis vs DB 一致性对账幂等性保证消息不重不漏服务端幂等写入每条离线消息有全局唯一的idempotentKey offline: msgIdmsgId 为 Snowflake IDTransactional public void storeOfflineMessage(Long userId, String messageJson, Long msgId) { String key offline: msgId; // 数据层幂等判断 if (offlineMessageRepository.existsByIdempotentKey(key)) { log.debug(幂等跳过: {}, key); return; } OfflineMessage record new OfflineMessage(); record.setUserId(userId); record.setContent(messageJson); record.setIdempotentKey(key); record.setMsgId(msgId); record.setStatus(OfflineMessage.STATUS_UNPUSHED); offlineMessageRepository.save(record); // Redis 加速 redisTemplate.opsForList().rightPush(offline:msgs: userId, messageJson); }客户端 LRU 去重前端维护一个最大 500 条的idempotentKey缓存重复消息直接丢弃class IdempotentCache { constructor(maxSize 500) { this.maxSize maxSize this.cache new Map() // 利用 Map 的插入顺序实现 LRU } add(key) { if (this.cache.has(key)) { this.cache.delete(key) // 重新插入以更新访问顺序 } else if (this.cache.size this.maxSize) { const oldest this.cache.keys().next().value this.cache.delete(oldest) // 淘汰最久未访问的 } this.cache.set(key, true) } has(key) { return this.cache.has(key) } } // 在消息入口处去重 function handleChatMessage(data) { if (data.idempotentKey idempotentCache.has(data.idempotentKey)) { return // 直接丢弃 } idempotentCache.add(data.idempotentKey) // ... 正常处理 }三层保障的可靠性矩阵故障场景第一层实时第二层补偿结果Redis 崩溃降级走 DB✅不丢推送时网络闪断❌5 分钟内重推不丢服务重启上线触发推送✅不丢消息重复发送幂等跳过幂等跳过不重极端并发LRU 兜底—不重七、离线消息数据表设计CREATE TABLE offline_message ( id BIGSERIAL PRIMARY KEY, user_id BIGINT NOT NULL, -- 接收者 content TEXT NOT NULL, -- 消息 JSON status INT DEFAULT 0, -- 0待推送, 1已推送, 2推送失败 idempotent_key VARCHAR(255), -- 幂等键 msg_id BIGINT, -- 原始消息 ID push_count INT DEFAULT 0, -- 已重试次数 last_push_time TIMESTAMP, -- 最后推送时间 create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX idx_offline_message_user ON offline_message(user_id); CREATE INDEX idx_offline_message_status ON offline_message(status); CREATE INDEX idx_offline_message_key ON offline_message(idempotent_key);为什么不是简单的先写 DB 再删状态机设计0→1→清理让每条消息可追踪失败原因可排查pushCount 记录重试次数定时对账可以发现 DB 和 Redis 的数据不一致八、心跳保活与连接清理// 服务端60 秒无数据则关闭 Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { ctx.close(); } }// 前端每 30 秒发一次心跳 function startHeartbeat() { heartbeatTimer setInterval(() { sendMessage({ type: heartbeat }) }, 30000) }为什么前端间隔是 30 秒服务端超时是 60 秒给网络抖动留一个缓冲窗口丢一个心跳包没关系服务端超时 前端间隔 × 2优雅容错九、分布式部署Redis Pub/Sub当信令服务部署多实例时用户 A 在实例 1 上线用户 B 在实例 2 需要收到好友状态通知实例1 (user:1001 上线) Redis Pub/Sub 实例2 (user:1002 在线) │ │ │ │── set user:status:1001online │ │ │── publish channel:friend_status ────────────────── │ │ │ │ 收到消息 │ │ 通知 user:1002 │ 好友 1001 上线// 状态变更时通过 Redis 广播 private void notifyFriendsStatus(Long userId, String status) { MapString, Object msg Map.of( type, friend_status, userId, userId, status, status ); redisTemplate.convertAndSend(channel:friend_status, json(msg)); } // 订阅者收到广播后推送给当前实例的在线用户 Component public class RedisStatusSubscriber { public void onMessage(String message, String channel) { StatusMessage status JSON.parseObject(message, StatusMessage.class); sessionManager.broadcastToAllFriends(status.getUserId(), message); } }十、文件消息图片与文件传输聊天中的图片和文件通过MinIO (S3 协议对象存储处理不走 WebSocket选择文件 → POST /api/chat/upload (multipart) → MinIO 存储 → 返回 fileUrl → WebSocket 发送 chat_message { messageType: image, fileUrl, ... } → 接收方根据 messageType 渲染图片/文件卡片文件访问使用 MinIO 预签名 URL7 天有效期无需额外鉴权public String generatePresignedUrl(String objectKey) { return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder() .bucket(bucketName) .object(objectKey) .expiry(7, TimeUnit.DAYS) .build()); }十一、完整消息流程图┌──────┐ ┌──────────────┐ ┌──────┐ │ 发送方 │ │ 信令服务 │ │ 接收方 │ └──┬───┘ └──────┬───────┘ └──┬───┘ │ │ │ │── chat_message ──────────────│ │ │ │── saveMessage() │ │ │── chat_ack ────────────────── │ │── chat_ack ────────────────│ │ │ │ │ │ │── 在线检查 │ │ │ ├── 在线 → 直接转发 ────────│ │ │ └── 离线 │ │ │ ├── storeOfflineMessage() │ │ │ │ DB(幂等) Redis │ │ │ │ status0 │ │ │ │ │ (接收方上线) │ │ │ │── WebSocket 握手 │ │ │── pushOfflineMessages() │ │ │ ├── Redis 优先拉取 │ │ │ ├── DB status0 补拉 │ │ │ ├── WebSocket 推送 ────────│ │ │ │ 带 idempotentKey │ │ │ └── 标记 DB status1 │ │ │ │ │ (5 分钟后) │ │ │ │── [定时补偿] │ │ │ ├── 扫描 status0 │ │ │ ├── 用户在线? 重推 ────────│ │ │ └── 成功→status1 │ │ │ 失败→status2,count │十二、总结本文实现了功能技术方案长连接管理Netty Reactor 模型 Pipeline 编排用户鉴权JWT 握手时验证会话映射ConcurrentHashMap 双向索引消息可靠送达ACK 10s 超时重试 clientMsgId离线消息不丢DB 持久化 Redis 加速 状态机定时补偿Scheduled 每 5 分钟扫描 status0消息幂等idempotentKey 数据库去重 前端 LRU监控对账REST 端点暴露积压/失败/延迟指标在线状态Redis TTL Pub/Sub 跨实例广播图片/文件MinIO 对象存储 预签名 URL心跳保活30s 客户端心跳 60s 服务端 idle效果进一步优化的方向消息有序性引入 sequenceId客户端严格按序渲染推送通道集成 Web Push / APNs / FCMApp 后台时也能收到通知读写分离历史消息归档到只读库缓解主库压力多设备同步引入 sync 机制手机和 PC 的消息状态同步消息加密端到端加密E2EE服务端无法解密内容项目技术栈后端Java 17 Spring Boot 3.2 Spring Cloud 2023 Netty Dubbo 3.x 存储PostgreSQL Redis MinIO (S3) 注册Nacos 2.x 前端Vue 3 uni-app WebRTC

相关新闻