物联网 基于netty控制报文结构(发布与接收)

发布时间:2026/5/25 21:04:40

物联网 基于netty控制报文结构(发布与接收) 物联网 基于netty控制报文结构(发布与接收)简述源码(netty-sample-04-PubSub)PUBLISH 报文结构标志位说明简易代码服务端客户端物联网 基于netty控制报文结构(发布与接收)简述MQTT PUBLISH 报文 的二进制结构固定头、可变头、有效载荷并基于 Netty 实现一个支持 QoS 0 的发布/订阅 Broker源码(netty-sample-04-PubSub)https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-04PUBLISH 报文结构组成部分字节数内容固定头2~5第一个字节高4位3报文类型低4位标志位DUP、QoS、RETAIN后续字节剩余长度变长编码可变头变长主题名UTF-8 字符串2字节长度 主题字节如果 QoS 0则紧跟 2 字节报文标识符有效载荷变长应用消息二进制剩余长度 可变头长度 有效载荷长度。标志位说明固定头第一个字节的低4位位含义bit 3DUP重发标志bit 2-1QoS00001110211保留bit 0RETAIN保留标志简易代码服务端package com.jysemel.iot; import com.jysemel.iot.coder.MqttPublishDecoder; import com.jysemel.iot.pojo.PublishMessage; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class MqttServer { public static void main(String[] args) throws Exception { EventLoopGroup boss new NioEventLoopGroup(1); EventLoopGroup worker new NioEventLoopGroup(); try { ServerBootstrap b new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ChannelPipeline p ch.pipeline(); // 添加解码器字节 - PublishMessage p.addLast(new MqttPublishDecoder()); // 业务处理器处理收到的 PublishMessage p.addLast(new SimpleChannelInboundHandlerPublishMessage() { Override protected void channelRead0(ChannelHandlerContext ctx, PublishMessage msg) { System.out.println(收到: topic msg.getTopic() , payload msg.getPayload()); } }); } }); ChannelFuture f b.bind(1883).sync(); System.out.println(MQTT server started on port 1883); f.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }客户端package com.jysemel.iot; import com.jysemel.iot.coder.MqttPublishEncoder; import com.jysemel.iot.pojo.PublishMessage; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class MqttClient { public static void main(String[] args) throws Exception { EventLoopGroup group new NioEventLoopGroup(); try { Bootstrap b new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ChannelPipeline p ch.pipeline(); // 客户端不需要解码只需要编码 PUBLISH p.addLast(new MqttPublishEncoder()); p.addLast(new ChannelInboundHandlerAdapter() { Override public void channelActive(ChannelHandlerContext ctx) { // 连接建立后发送一个 PUBLISH 消息 PublishMessage msg new PublishMessage(test, 你好 MQTT); System.out.println(发送: topictest, payloadHello MQTT); ctx.writeAndFlush(msg); } }); } }); ChannelFuture f b.connect(127.0.0.1, 1883).sync(); // 等待 5 秒后关闭 Thread.sleep(5000); f.channel().close().sync(); } finally { group.shutdownGracefully(); } } }

相关新闻