别再纠结选型了!手把手教你用SpringBoot+Netty从零搭建一个IM系统(附完整源码)

发布时间:2026/5/30 17:47:13

别再纠结选型了!手把手教你用SpringBoot+Netty从零搭建一个IM系统(附完整源码) 从零构建高可靠IM系统SpringBoot与Netty深度整合实战在数字化协作日益普及的今天即时通讯IM系统已成为各类应用的标配功能。无论是社交平台、在线客服还是企业内部协作工具实时消息传递能力都是提升用户体验的关键要素。本文将带您深入探索如何基于SpringBoot和Netty构建一个高性能、可扩展的IM系统从架构设计到代码实现完整呈现企业级IM系统的开发全貌。1. 技术选型与架构设计1.1 为什么选择SpringBootNetty组合现代IM系统需要同时满足高并发、低延迟和易维护三大核心需求。SpringBoot与Netty的组合完美契合这些要求SpringBoot优势快速搭建RESTful API服务完善的依赖管理和自动配置丰富的生态系统Spring Data, Spring Security等便捷的数据库集成Netty核心价值异步事件驱动架构支持百万级并发连接零拷贝技术降低内存消耗灵活的编解码器机制成熟的心跳检测和断线重连机制1.2 IM系统核心架构典型IM系统采用分层设计各层职责明确┌───────────────────────────────────────┐ │ 客户端 │ └───────────────────────────────────────┘ ▲ │ │ ▼ ┌───────────────────────────────────────┐ │ 接入层Netty │ │ ┌─────────┐ ┌─────────┐ │ │ │ 鉴权模块 │ │ 协议解析 │ │ │ └─────────┘ └─────────┘ │ └───────────────────────────────────────┘ ▲ │ │ ▼ ┌───────────────────────────────────────┐ │ 逻辑层SpringBoot │ │ ┌─────────┐ ┌─────────┐ │ │ │ 消息路由 │ │ 群组管理 │ │ │ └─────────┘ └─────────┘ │ └───────────────────────────────────────┘ ▲ │ │ ▼ ┌───────────────────────────────────────┐ │ 存储层 │ │ ┌─────────┐ ┌─────────┐ │ │ │ MySQL │ │ Redis │ │ │ └─────────┘ └─────────┘ │ └───────────────────────────────────────┘2. 核心功能实现2.1 长连接管理与消息收发Netty服务端基础配置Configuration public class NettyServerConfig { Value(${netty.port}) private int port; Bean public ServerBootstrap serverBootstrap() { EventLoopGroup bossGroup new NioEventLoopGroup(1); EventLoopGroup workerGroup new NioEventLoopGroup(); ServerBootstrap b new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new IdleStateHandler(30, 0, 0)) .addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufDecoder(Message.getDefaultInstance())) .addLast(new ProtobufVarint32LengthFieldPrepender()) .addLast(new ProtobufEncoder()) .addLast(new AuthHandler()) .addLast(new MessageHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); return b; } }消息处理核心逻辑public class MessageHandler extends SimpleChannelInboundHandlerMessage { Override protected void channelRead0(ChannelHandlerContext ctx, Message msg) { switch (msg.getType()) { case PRIVATE_MSG: handlePrivateMessage(ctx, msg); break; case GROUP_MSG: handleGroupMessage(ctx, msg); break; case ACK: handleAck(ctx, msg); break; default: log.warn(Unknown message type: {}, msg.getType()); } } private void handlePrivateMessage(ChannelHandlerContext ctx, Message msg) { // 消息持久化 messageService.saveMessage(msg); // 获取接收方Channel Channel targetChannel SessionHolder.getChannel(msg.getTo()); if (targetChannel ! null targetChannel.isActive()) { targetChannel.writeAndFlush(msg); // 发送ACK确认 ctx.writeAndFlush(buildAck(msg.getMessageId())); } else { // 离线消息处理 offlineMessageService.saveOfflineMessage(msg); } } }2.2 关系链模块设计好友关系数据库设计CREATE TABLE im_friendship ( id bigint(20) NOT NULL AUTO_INCREMENT, app_id int(11) NOT NULL COMMENT 应用ID, from_id varchar(64) NOT NULL COMMENT 用户A, to_id varchar(64) NOT NULL COMMENT 用户B, remark varchar(100) DEFAULT NULL COMMENT 备注, status tinyint(4) NOT NULL DEFAULT 0 COMMENT 状态 0-未添加 1-正常 2-删除, black tinyint(4) NOT NULL DEFAULT 0 COMMENT 是否拉黑, create_time bigint(20) NOT NULL COMMENT 创建时间, update_time bigint(20) DEFAULT NULL COMMENT 更新时间, PRIMARY KEY (id), UNIQUE KEY idx_app_from_to (app_id,from_id,to_id), KEY idx_app_to_from (app_id,to_id,from_id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;好友添加业务逻辑Service RequiredArgsConstructor public class FriendServiceImpl implements FriendService { private final ImFriendShipMapper friendShipMapper; private final RedisSeq redisSeq; private final MessageProducer messageProducer; Transactional public ResponseVO addFriend(RequestBase request, String fromId, FriendDto dto) { // 校验是否已是好友 ImFriendShipEntity exist friendShipMapper.selectOne(new LambdaQueryWrapperImFriendShipEntity() .eq(ImFriendShipEntity::getAppId, request.getAppId()) .eq(ImFriendShipEntity::getFromId, fromId) .eq(ImFriendShipEntity::getToId, dto.getToId())); if (exist ! null exist.getStatus() FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()) { return ResponseVO.errorResponse(FriendShipErrorCode.TO_IS_YOUR_FRIEND); } // 生成序列号 long seq redisSeq.doGetSeq(request.getAppId() : Constants.SeqConstants.Friendship); // 保存正向关系 ImFriendShipEntity entity new ImFriendShipEntity(); entity.setAppId(request.getAppId()); entity.setFromId(fromId); entity.setToId(dto.getToId()); entity.setRemark(dto.getRemark()); entity.setStatus(FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()); entity.setFriendSequence(seq); entity.setCreateTime(System.currentTimeMillis()); friendShipMapper.insert(entity); // 保存反向关系 ImFriendShipEntity reverseEntity new ImFriendShipEntity(); reverseEntity.setAppId(request.getAppId()); reverseEntity.setFromId(dto.getToId()); reverseEntity.setToId(fromId); reverseEntity.setStatus(FriendShipStatusEnum.FRIEND_STATUS_NORMAL.getCode()); reverseEntity.setFriendSequence(seq); reverseEntity.setCreateTime(System.currentTimeMillis()); friendShipMapper.insert(reverseEntity); // 发送好友添加通知 messageProducer.sendToUser(fromId, request, FriendshipEventCommand.FRIEND_ADD, buildAddFriendPack(entity)); return ResponseVO.successResponse(); } }2.3 群组功能实现群组核心表结构设计CREATE TABLE im_group ( id bigint(20) NOT NULL AUTO_INCREMENT, app_id int(11) NOT NULL COMMENT 应用ID, group_id varchar(64) NOT NULL COMMENT 群组ID, name varchar(100) NOT NULL COMMENT 群名称, type tinyint(4) NOT NULL COMMENT 群类型, owner_id varchar(64) NOT NULL COMMENT 群主ID, create_time bigint(20) NOT NULL COMMENT 创建时间, update_time bigint(20) DEFAULT NULL COMMENT 更新时间, PRIMARY KEY (id), UNIQUE KEY idx_app_group (app_id,group_id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4; CREATE TABLE im_group_member ( id bigint(20) NOT NULL AUTO_INCREMENT, app_id int(11) NOT NULL COMMENT 应用ID, group_id varchar(64) NOT NULL COMMENT 群组ID, member_id varchar(64) NOT NULL COMMENT 成员ID, role tinyint(4) NOT NULL DEFAULT 0 COMMENT 角色 0-普通成员 1-管理员 2-群主, join_time bigint(20) NOT NULL COMMENT 加入时间, PRIMARY KEY (id), UNIQUE KEY idx_app_group_member (app_id,group_id,member_id) ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;群消息扩散优化策略public void handleGroupMessage(ChannelHandlerContext ctx, Message msg) { // 1. 消息持久化 messageService.saveMessage(msg); // 2. 获取群成员列表带缓存 ListString memberIds groupService.getGroupMembers(msg.getGroupId()); // 3. 并行发送消息 memberIds.parallelStream().forEach(memberId - { if (!memberId.equals(msg.getFrom())) { // 不发送给自己 Channel channel SessionHolder.getChannel(memberId); if (channel ! null channel.isActive()) { channel.writeAndFlush(msg); } else { offlineMessageService.saveOfflineMessage( msg.toBuilder().setTo(memberId).build()); } } }); // 4. 发送ACK ctx.writeAndFlush(buildAck(msg.getMessageId())); }3. 高级特性实现3.1 消息可靠性保证IM系统必须确保消息不丢失、不重复、不乱序。我们采用多级ACK机制客户端ACK接收方收到消息后立即返回ACK服务端ACK消息持久化后向发送方返回ACK离线消息补偿用户上线后拉取未读消息消息状态流转设计状态描述处理逻辑SENDING发送中客户端显示发送中状态SENT已发送未收到服务端ACK启动重试机制DELIVERED服务端已接收持久化到数据库READ对方已读更新已读状态FAILED发送失败显示红色感叹号消息重试机制实现public class MessageRetryHandler extends ChannelInboundHandlerAdapter { private static final int MAX_RETRY 3; private static final long RETRY_INTERVAL 1000L; private final MapString, MessageTask pendingMessages new ConcurrentHashMap(); Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof Message) { Message message (Message) msg; if (message.getType() MessageType.ACK) { // 收到ACK移除重试任务 pendingMessages.remove(message.getMessageId()); } } ctx.fireChannelRead(msg); } public void sendWithRetry(Channel channel, Message message) { if (channel null || !channel.isActive()) { return; } MessageTask task new MessageTask(channel, message); pendingMessages.put(message.getMessageId(), task); channel.eventLoop().schedule(task, RETRY_INTERVAL, TimeUnit.MILLISECONDS); } private class MessageTask implements Runnable { private final Channel channel; private final Message message; private int retryCount; MessageTask(Channel channel, Message message) { this.channel channel; this.message message; } Override public void run() { if (retryCount MAX_RETRY) { pendingMessages.remove(message.getMessageId()); // 通知发送方消息发送失败 channel.writeAndFlush(buildFailedAck(message.getMessageId())); return; } if (channel.isActive()) { retryCount; channel.writeAndFlush(message); channel.eventLoop().schedule(this, RETRY_INTERVAL, TimeUnit.MILLISECONDS); } else { pendingMessages.remove(message.getMessageId()); } } } }3.2 性能优化策略连接管理优化使用Netty的Native Epoll传输Linux环境合理配置TCP参数.option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true)消息处理优化关键路径异步化处理批量消息合并发送热点数据本地缓存存储层优化消息分库分表策略冷热数据分离存储Redis多级缓存设计4. 安全与稳定性保障4.1 连接安全机制认证流程客户端连接后首先发送认证请求服务端校验Token有效性建立Session并绑定Channel认证处理器实现public class AuthHandler extends ChannelInboundHandlerAdapter { Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof Message) { Message message (Message) msg; if (message.getType() MessageType.AUTH_REQ) { // 验证Token boolean valid authService.validateToken( message.getAuth().getToken()); if (valid) { // 绑定用户Session SessionHolder.bindSession( message.getFrom(), ctx.channel()); // 返回认证成功 ctx.writeAndFlush(buildAuthAck(true)); // 移除认证Handler ctx.pipeline().remove(this); // 发送离线消息 offlineMessageService.pushOfflineMessages( message.getFrom(), ctx.channel()); } else { ctx.writeAndFlush(buildAuthAck(false)); ctx.close(); } return; } } ctx.fireChannelRead(msg); } }4.2 监控与运维关键监控指标在线用户数消息吞吐量消息延迟分布连接异常率日志收集方案public class LoggingHandler extends ChannelDuplexHandler { Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info(Received message: {}, msg); ctx.fireChannelRead(msg); } Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { log.info(Sent message: {}, msg); ctx.write(msg, promise); } Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error(Channel error: {}, ctx.channel(), cause); ctx.close(); } }优雅停机实现PreDestroy public void shutdown() { // 1. 拒绝新连接 bossGroup.shutdownGracefully(); // 2. 通知所有客户端 SessionHolder.getAllChannels().forEach(channel - { if (channel.isActive()) { channel.writeAndFlush(buildSystemMessage(系统即将维护)); } }); // 3. 等待消息处理完成 try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 4. 关闭工作线程组 workerGroup.shutdownGracefully(); }

相关新闻