
1. 为什么需要WebSocket双向通信传统的HTTP协议是单向通信协议客户端发起请求服务器返回响应然后连接就关闭了。这种模式在需要实时数据交互的场景下显得力不从心比如在线聊天室、实时股票行情、多人协作编辑等应用。想象一下你在使用在线文档编辑时如果每次都要刷新页面才能看到别人的修改那体验会有多糟糕。WebSocket协议就是为了解决这个问题而生的。它建立在TCP连接之上只需要一次握手就能建立持久连接之后服务器和客户端可以随时互相发送消息。这种全双工通信方式特别适合需要低延迟、高频率数据交换的场景。我在实际项目中遇到过这样一个需求需要开发一个实时监控系统后端要能随时向前端推送监控数据同时前端也要能随时发送控制指令。如果使用传统的轮询方式不仅延迟高还会给服务器带来不必要的压力。改用WebSocket后不仅实现了真正的实时通信服务器压力也降低了70%以上。2. SpringBoot WebFlux中的WebSocket基础2.1 环境准备与依赖配置首先创建一个新的SpringBoot项目在pom.xml中添加必要的依赖dependencies !-- WebFlux核心依赖 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-webflux/artifactId /dependency !-- 用于JSON处理 -- dependency groupIdcom.fasterxml.jackson.core/groupId artifactIdjackson-databind/artifactId /dependency !-- Lombok简化代码 -- dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency /dependencies这里有个坑需要注意WebFlux和传统的Spring MVC不能同时使用。如果你项目中已经引入了spring-boot-starter-web需要先移除它否则会报错。2.2 核心接口WebSocketHandler在WebFlux中处理WebSocket连接的核心接口是WebSocketHandler。它只有一个方法需要实现public interface WebSocketHandler { MonoVoid handle(WebSocketSession session); }这个方法接收一个WebSocketSession对象我们可以通过它来收发消息。下面是一个最简单的EchoHandler实现Component public class EchoHandler implements WebSocketHandler { Override public MonoVoid handle(WebSocketSession session) { return session.send( session.receive() .map(msg - session.textMessage(ECHO - msg.getPayloadAsText())) ); } }这个实现很简单接收到什么消息就原样返回前面加个ECHO - 前缀。但实际项目中我们需要更复杂的处理逻辑。3. 构建完整的WebSocket服务端3.1 会话管理与消息封装在实际项目中我们通常需要管理所有活跃的WebSocket会话以便能够从业务代码中主动推送消息。下面是一个增强版的会话封装类Data Slf4j public class WebSocketSessionWrapper { private static final ConcurrentHashMapString, WebSocketSessionWrapper sessions new ConcurrentHashMap(); private final String sessionId; private final WebSocketSession session; private final FluxSinkWebSocketMessage messageSink; public static void broadcast(Object message) { sessions.values().forEach(wrapper - wrapper.send(message)); } public void send(Object message) { try { String json objectMapper.writeValueAsString(message); messageSink.next(session.textMessage(json)); } catch (JsonProcessingException e) { log.error(消息序列化失败, e); } } // 定期清理无效会话 static { Executors.newSingleThreadScheduledExecutor() .scheduleAtFixedRate(() - { sessions.values().removeIf(wrapper - !wrapper.getSession().isOpen() ); }, 1, 1, TimeUnit.MINUTES); } }这个封装类做了几件重要的事情使用静态Map保存所有活跃会话提供了广播消息的方法自动清理已关闭的连接封装了消息序列化逻辑3.2 带认证的WebSocket处理器实际项目中我们通常需要对WebSocket连接进行认证。下面是一个增强版的处理器实现Component Slf4j public class AuthWebSocketHandler implements WebSocketHandler { Override public MonoVoid handle(WebSocketSession session) { // 从查询参数获取token String token session.getHandshakeInfo() .getUri() .getQuery() .split()[1]; if(!validateToken(token)) { return session.close(CloseStatus.POLICY_VIOLATION); } // 输入处理 MonoVoid input session.receive() .doOnNext(message - processMessage(session, message)) .then(); // 输出处理 MonoVoid output session.send(Flux.create(sink - { WebSocketSessionWrapper wrapper new WebSocketSessionWrapper( session.getId(), session, sink ); WebSocketSessionWrapper.sessions.put(session.getId(), wrapper); })); return Mono.zip(input, output).then(); } private boolean validateToken(String token) { // 实现你的认证逻辑 return true; } private void processMessage(WebSocketSession session, WebSocketMessage message) { // 处理接收到的消息 } }这个处理器在连接建立时检查token无效的连接会立即关闭。有效连接会被保存到会话管理中供后续业务代码使用。3.3 注册WebSocket端点最后需要在配置类中注册我们的处理器Configuration public class WebSocketConfig { Bean public HandlerMapping webSocketMapping(AuthWebSocketHandler handler) { MapString, WebSocketHandler map new HashMap(); map.put(/ws, handler); SimpleUrlHandlerMapping mapping new SimpleUrlHandlerMapping(); mapping.setUrlMap(map); mapping.setOrder(-1); // 高优先级 return mapping; } Bean public WebSocketHandlerAdapter handlerAdapter() { return new WebSocketHandlerAdapter(); } }这样我们的WebSocket服务端就准备好了可以通过ws://localhost:8080/ws?tokenxxx来连接。4. WebSocket客户端实现4.1 浏览器客户端实现浏览器端使用WebSocket API非常简单const socket new WebSocket(ws://localhost:8080/ws?tokenabc123); socket.onopen () { console.log(连接已建立); socket.send(Hello Server!); }; socket.onmessage (event) { console.log(收到消息:, event.data); }; socket.onclose () { console.log(连接已关闭); };4.2 Java客户端实现如果你需要在Java应用中连接WebSocket服务可以使用WebFlux提供的ReactorNettyWebSocketClientpublic class MyWebSocketClient { public static void main(String[] args) { WebSocketClient client new ReactorNettyWebSocketClient(); client.execute(URI.create(ws://localhost:8080/ws?tokenabc123), session - { // 发送消息 MonoVoid output session.send( Flux.just(session.textMessage(Hello from Java client!)) ); // 接收消息 MonoVoid input session.receive() .map(WebSocketMessage::getPayloadAsText) .doOnNext(System.out::println) .then(); return Mono.zip(input, output).then(); }).block(); } }4.3 断线重连机制在实际应用中网络不稳定可能导致连接断开。我们需要实现自动重连public class ReconnectableClient { private static final WebSocketClient client new ReactorNettyWebSocketClient(); public static void connect() { client.execute(URI.create(ws://localhost:8080/ws), session - { // ...处理逻辑... return session.closeStatus() .doOnError(e - log.error(连接异常, e)) .doFinally(signal - { log.info(连接断开5秒后重连...); try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } connect(); // 重连 }) .then(); }).subscribe(); } }5. 高级功能与性能优化5.1 心跳检测与保活长时间空闲的连接可能会被代理服务器或防火墙关闭。我们可以通过心跳机制保持连接活跃public class HeartbeatHandler implements WebSocketHandler { private static final Duration HEARTBEAT_INTERVAL Duration.ofSeconds(30); Override public MonoVoid handle(WebSocketSession session) { FluxWebSocketMessage heartbeat Flux.interval(HEARTBEAT_INTERVAL) .map(i - session.pingMessage(d - d.wrap(HEARTBEAT.getBytes()))); MonoVoid input session.receive() .doOnNext(this::processMessage) .then(); MonoVoid output session.send( Flux.merge( heartbeat, // 其他业务消息... ) ); return Mono.zip(input, output).then(); } }5.2 消息压缩当传输大量数据时可以启用WebSocket的permessage-deflate扩展来压缩消息Bean public WebSocketClient webSocketClient() { return new ReactorNettyWebSocketClient( HttpClient.create() .compress(true) .wiretap(true) ); }5.3 负载测试与性能调优当连接数达到数千时需要注意以下几点调整Netty的工作线程数-Dreactor.netty.ioWorkerCount16增加文件描述符限制ulimit -n 100000使用-Xmx设置合适的堆内存考虑使用分布式方案如基于Redis的发布订阅6. 常见问题排查6.1 连接无法建立如果连接失败检查以下几点服务是否正常启动路径是否正确是否跨域问题需要配置CORS防火墙是否阻止了WebSocket端口6.2 消息丢失或乱序WebSocket本身保证消息顺序但如果你的逻辑中有异步处理可能会乱序。解决方案为消息添加序列号使用队列保证顺序处理客户端实现消息确认机制6.3 内存泄漏长时间运行后内存增长可能是因为会话没有正确清理消息积压没有及时消费未取消的订阅可以使用VisualVM或YourKit等工具分析内存使用情况。7. 实际应用案例7.1 实时聊天系统聊天系统是WebSocket的典型应用。我们可以这样设计每个用户连接时保存其用户ID和会话的映射私聊时根据用户ID查找对应会话发送群聊时遍历群成员ID发送public class ChatHandler implements WebSocketHandler { private static final MapString, WebSocketSessionWrapper userSessions new ConcurrentHashMap(); private static final MapString, SetString chatRooms new ConcurrentHashMap(); Override public MonoVoid handle(WebSocketSession session) { // 获取用户ID String userId getUserIdFromSession(session); // 保存用户会话 return session.send(Flux.create(sink - { WebSocketSessionWrapper wrapper new WebSocketSessionWrapper( userId, session, sink ); userSessions.put(userId, wrapper); // 加入默认聊天室 chatRooms.computeIfAbsent(general, k - ConcurrentHashMap.newKeySet()) .add(userId); })).and(session.receive().doOnNext(message - { // 处理聊天消息 ChatMessage chatMessage parseMessage(message); if(chatMessage.getType().equals(PRIVATE)) { sendPrivateMessage(chatMessage); } else { broadcastToRoom(chatMessage); } }).then()); } }7.2 实时数据监控另一个常见场景是实时监控系统前端建立WebSocket连接后端定时推送监控数据前端用图表实时展示public class MonitoringHandler implements WebSocketHandler { Override public MonoVoid handle(WebSocketSession session) { FluxWebSocketMessage dataStream Flux.interval(Duration.ofSeconds(1)) .map(i - { MonitoringData data collectMonitoringData(); return session.textMessage(toJson(data)); }); return session.send(dataStream) .and(session.receive().then()); // 忽略客户端消息 } }8. 安全注意事项8.1 认证与授权WebSocket协议本身不提供认证机制我们需要自己实现在握手阶段通过URL参数或Header传递token服务端验证token有效性无效连接立即关闭8.2 输入验证对所有接收到的消息进行严格验证验证消息格式限制消息大小防范注入攻击8.3 HTTPS与WSS生产环境一定要使用WSS(WebSocket Secure)防止中间人攻击避免内容被篡改浏览器对非安全WebSocket限制越来越多9. 调试与监控9.1 日志记录详细的日志有助于排查问题public class LoggingHandler extends WebSocketHandlerDecorator { public LoggingHandler(WebSocketHandler delegate) { super(delegate); } Override public MonoVoid handle(WebSocketSession session) { log.info(New connection from {}, session.getRemoteAddress()); return super.handle(session) .doOnError(e - log.error(WebSocket error, e)) .doFinally(s - log.info(Connection closed: {}, s)); } }9.2 指标监控集成Micrometer监控WebSocket指标Bean public MeterBinder webSocketMetrics(WebSocketSessionRepository repository) { return registry - { Gauge.builder(websocket.sessions.count, repository::getSessionCount) .register(registry); }; }10. 与其它技术的对比10.1 WebSocket vs HTTP轮询特性WebSocketHTTP轮询连接方式持久连接短连接延迟低高服务器压力小大浏览器支持现代浏览器所有浏览器消息方向双向主要是客户端发起10.2 WebSocket vs SSE(Server-Sent Events)SSE适合服务器向客户端单向推送的场景更简单的协议自动重连但只能服务器向客户端发送10.3 WebSocket vs MQTTMQTT更适合物联网场景更轻量级的协议支持QoS但需要额外的broker11. 未来发展趋势WebSocket作为HTML5标准的一部分已经相当成熟但仍在不断发展WebSocket over HTTP/2可以减少连接数更好的压缩算法与QUIC协议结合在实际项目中我遇到过需要支持上万并发连接的需求。通过优化单台4核8G的服务器可以轻松支持2万以上的WebSocket连接关键在于合理配置Netty参数和优化业务逻辑。