
在物联网开发中网关设备与子设备的通信是核心场景之一。本文以温控设备模拟空调为例详细讲解基于MQTT协议的网关-子设备属性读写交互逻辑包括设备上线、属性主动上报、目标温度写入等核心功能并提供可直接运行的Python实现代码为物联网设备开发提供实用参考。关于jetlinks的其他已测试内容可以参考本人的专栏https://blog.csdn.net/weixin_43951955/category_13043980.html?spm1001.2014.3001.54821. 场景背景本次示例中的温控设备包含两个核心属性当前环境温度temperature即室温随环境实时变化目标温度temperature_dest即空调设定温度支持云平台远程写入修改。本文重点聚焦属性的写入操作同时覆盖子设备上线、属性主动上报等基础逻辑。2. MQTT 主题与报文规范网关子设备无需独立的MQTT客户端直接复用网关的MQTT通道与云平台通信核心交互围绕以下主题和报文格式展开。2.1 主动上报属性子设备→云平台子设备通过网关主动上报环境温度和目标温度报文格式简洁同时该操作也是子设备的“上线触发条件”。上报报文格式{properties:{temperature:25.6,temperature_dest:27}}工具测试示例使用MQTTX工具直接上传上述报文即可完成属性上报使用 MQTTX 工具上传两个温度属性显示OK成功2.2 写属性云平台→网关→子设备云平台下发指令修改子设备的目标温度网关接收指令后处理并回复执行结果。主题约定方向主题格式说明下行云→网关/{产品ID}/{设备名称}/properties/write云平台下发写属性指令上行网关→云/{productId}/{deviceId}/properties/write/reply网关回复指令执行结果报文格式下行报文云平台下发{headers:{deviceName:温控-485,productName:温控-子设备,productId:2020388484487761920,_uid:WZw9Nea1m8IlUn1AiebQifJlC8OEEenV,traceparent:00-306a597889cdd28e5611a38ebdf8587e-755ec08297b54fd5-01},messageId:2020473806001197056,deviceId:2020389405645000704,timestamp:1770553468595,properties:{temperature_dest:25},messageType:WRITE_PROPERTY,replyType:READ_PROPERTY_REPLY}上行报文网关回复需原样返回messageId并标识执行结果{messageId:平台下发报文中的messageId,success:true}3. 网关与子设备的上下线逻辑3.1 设备上线规则设备类型上线条件核心说明网关设备MQTT客户端成功连接云平台网关作为独立MQTT客户端连接成功 即 在线子设备网关通过MQTT通道上报子设备任意属性子设备无独立MQTT连接上报属性是触发上线的核心动作若无属性可上报可自定义“上下线状态”属性怎么折腾都行没有属性就创造属性3.2 设备离线规则被动离线网关MQTT连接断开MQTT心跳机制检测云平台自动标记网关及下属子设备离线主动离线实际开发中极少主动上报离线报文重点需排查离线原因如网络、设备故障并恢复上线。3.3 MQTTX工具测试验证网关设备上线仅需完成MQTT连接网关即可上线只要连接MQTT网关即可上线子设备上线通过网关的MQTT通道发送任意属性上报报文子设备立即上线只要发一个属性上报报文子设备就上线了4. Python 实现代码以下代码完整实现网关的MQTT连接、子设备属性5秒定时上报、目标温度写入指令处理等核心功能代码无需修改可直接运行杠杠的。importjsonimportloggingimporttimeimportrandomfromthreadingimportTimerfrompaho.mqttimportclientasmqtt_clientfrompaho.mqtt.clientimportMQTTv311# 核心配置项 # MQTT服务器配置MQTT_BROKER192.168.111.53MQTT_PORT1883MQTT_USERNAMEadminMQTT_PASSWORDadmin# 网关设备信息作为MQTT客户端标识GATEWAY_PRODUCT_ID2020387880562511872GATEWAY_DEVICE_ID2020389102749143040CLIENT_IDGATEWAY_DEVICE_ID# 网关设备ID作为MQTT客户端ID# 子设备温控设备信息SUB_DEVICE_PRODUCT_ID2020388484487761920SUB_DEVICE_ID2020389405645000704# Topic定义 # 1. 子设备属性主动上报TopicSUB_DEVICE_PROPERTY_REPORT_TOPIC(f/{SUB_DEVICE_PRODUCT_ID}/{SUB_DEVICE_ID}/properties/report)# 2. 子设备写属性下行Topic云平台→网关SUB_DEVICE_PROPERTY_WRITE_TOPIC(f/{SUB_DEVICE_PRODUCT_ID}/{SUB_DEVICE_ID}/properties/write)# 3. 子设备写属性响应上行Topic网关→云平台SUB_DEVICE_PROPERTY_WRITE_REPLY_TOPIC(f/{SUB_DEVICE_PRODUCT_ID}/{SUB_DEVICE_ID}/properties/write/reply)# 温控设备状态 DEVICE_STATE{temperature:25.0,# 当前环境温度模拟随机波动temperature_dest:27.0,# 目标温度初始值last_update_time:None,# 最后更新时间}# 日志配置logging.basicConfig(levellogging.INFO,format%(asctime)s - %(levelname)s - %(message)s)loggerlogging.getLogger(__name__)defprint_all_mqtt_topics_detail():详细打印代码中所有MQTT主题含变量名、含义、流向、实际值logger.info(\n*80)logger.info( 代码中所有MQTT主题详细信息)logger.info(*80)# 构造主题详情列表变量名、含义、数据流向、实际值topic_details[{var_name:SUB_DEVICE_PROPERTY_REPORT_TOPIC,description:子设备属性主动上报Topic,data_flow:上行网关 → 云平台,purpose:定时上报环境温度、目标温度同时触发子设备上线,actual_value:SUB_DEVICE_PROPERTY_REPORT_TOPIC,},{var_name:SUB_DEVICE_PROPERTY_WRITE_TOPIC,description:子设备写属性下行Topic,data_flow:下行云平台 → 网关,purpose:接收云平台下发的目标温度修改指令,actual_value:SUB_DEVICE_PROPERTY_WRITE_TOPIC,},{var_name:SUB_DEVICE_PROPERTY_WRITE_REPLY_TOPIC,description:子设备写属性响应上行Topic,data_flow:上行网关 → 云平台,purpose:回复云平台写属性指令的执行结果successtrue/false,actual_value:SUB_DEVICE_PROPERTY_WRITE_REPLY_TOPIC,},]# 逐个打印主题详情foridx,topicinenumerate(topic_details,1):logger.info(f\n【{idx}】主题变量{topic[var_name]})logger.info(f 含义{topic[description]})logger.info(f 数据流向{topic[data_flow]})logger.info(f 用途{topic[purpose]})logger.info(f 实际完整主题\n{topic[actual_value]})logger.info(\n*80\n)defconnect_mqtt()-mqtt_client.Client:创建并连接MQTT客户端网关身份defon_connect(client,userdata,flags,rc,propertiesNone):rc_msg{0:连接成功,1:协议版本错误,2:客户端ID非法,3:服务器不可用,4:用户名/密码错误,5:未授权,}ifrc0:logger.info(f✅ 网关MQTT连接成功{MQTT_BROKER}:{MQTT_PORT})else:logger.error(f❌ 网关连接失败rc{rc}{rc_msg.get(rc,未知错误)})defon_disconnect(client,userdata,rc,propertiesNone):ifrc!0:logger.warning(f⚠️ 网关MQTT被动断开将自动重连rc{rc})# 创建MQTT客户端网关身份clientmqtt_client.Client(client_idCLIENT_ID,callback_api_versionmqtt_client.CallbackAPIVersion.VERSION1,protocolMQTTv311,)client.username_pw_set(MQTT_USERNAME,MQTT_PASSWORD)client.on_connecton_connect client.on_disconnecton_disconnect client.auto_reconnectTrueclient.reconnect_delay_set(min_delay2,max_delay10)# 连接MQTT服务器try:client.connect(MQTT_BROKER,MQTT_PORT,keepalive60)exceptExceptionase:logger.error(f❌ 网关TCP连接失败{str(e)})raisereturnclientdefsimulate_environment_temperature():模拟环境温度随机波动±0.5℃base_tempDEVICE_STATE[temperature]# 随机波动范围20~30℃new_tempbase_temprandom.uniform(-0.5,0.5)new_tempmax(20.0,min(30.0,new_temp))# 限制温度范围DEVICE_STATE[temperature]round(new_temp,1)defreport_temperature_properties(client:mqtt_client.Client):主动上报温度属性每5秒执行try:# 1. 模拟环境温度波动simulate_environment_temperature()DEVICE_STATE[last_update_time]time.strftime(%Y-%m-%d %H:%M:%S,time.localtime())# 2. 构造上报报文report_payloadjson.dumps({properties:{temperature:DEVICE_STATE[temperature],temperature_dest:DEVICE_STATE[temperature_dest],}},ensure_asciiFalse,)# 3. 发布上报报文resultclient.publish(SUB_DEVICE_PROPERTY_REPORT_TOPIC,report_payload,qos0)ifresult[0]0:logger.info(f 子设备属性上报成功 | 环境温度{DEVICE_STATE[temperature]}℃ | 目标温度{DEVICE_STATE[temperature_dest]}℃)else:logger.error(f❌ 子设备属性上报失败状态码{result[0]})exceptExceptionase:logger.error(f❌ 属性上报异常{str(e)})# 4. 定时任务5秒后再次执行Timer(5,report_temperature_properties,args[client]).start()defparse_write_property_request(payload:str)-tuple:解析云平台下发的写属性请求try:payload_datajson.loads(payload)# 提取核心字段message_idpayload_data.get(messageId)# 回复时需原样返回propertiespayload_data.get(properties,{})# 待写入的属性message_typepayload_data.get(messageType)# 消息类型WRITE_PROPERTYreturnmessage_id,properties,message_typeexceptjson.JSONDecodeError:logger.error(f❌ 写属性请求解析失败{payload})returnNone,{},defhandle_write_temperature_dest(properties:dict)-bool:处理目标温度写入请求# 1. 校验参数iftemperature_destnotinproperties:logger.error(❌ 写属性请求缺少参数temperature_dest)returnFalsetarget_tempproperties[temperature_dest]# 校验温度范围合理范围16~35℃if(notisinstance(target_temp,(int,float))ortarget_temp16ortarget_temp35):logger.error(f❌ 目标温度参数错误{target_temp}必须是16~35之间的数字)returnFalse# 2. 更新设备状态DEVICE_STATE[temperature_dest]round(target_temp,1)DEVICE_STATE[last_update_time]time.strftime(%Y-%m-%d %H:%M:%S,time.localtime())logger.info(f 目标温度更新成功 | 新目标温度{DEVICE_STATE[temperature_dest]}℃)returnTruedefon_message(client,userdata,msg):处理MQTT消息主要是写属性请求topicmsg.topic payloadmsg.payload.decode(utf-8,errorsignore)logger.info(f\n 收到MQTT消息)logger.info(f Topic:{topic})logger.info(f 报文:{json.dumps(json.loads(payload),ensure_asciiFalse,indent2)ifpayloadelse空报文})# 处理子设备写属性请求iftopicSUB_DEVICE_PROPERTY_WRITE_TOPIC:# 1. 解析请求报文message_id,properties,message_typeparse_write_property_request(payload)ifnotmessage_idormessage_type!WRITE_PROPERTY:logger.error(❌ 非有效写属性请求跳过回复)return# 2. 处理目标温度写入successhandle_write_temperature_dest(properties)# 3. 构造回复报文reply_payloadjson.dumps({messageId:message_id,success:success},ensure_asciiFalse)# 4. 发布回复报文reply_resultclient.publish(SUB_DEVICE_PROPERTY_WRITE_REPLY_TOPIC,reply_payload,qos0)ifreply_result[0]0:logger.info(f✅ 写属性回复成功 | messageId:{message_id}| success:{success})else:logger.error(f❌ 写属性回复发布失败状态码{reply_result[0]})defsubscribe_related_topics(client:mqtt_client.Client):订阅相关Topic主要是写属性请求Topic# 订阅子设备写属性Topicclient.subscribe(SUB_DEVICE_PROPERTY_WRITE_TOPIC,qos0)client.on_messageon_message logger.info(f 已订阅子设备写属性Topic{SUB_DEVICE_PROPERTY_WRITE_TOPIC})logger.info(f 子设备初始状态 | 环境温度{DEVICE_STATE[temperature]}℃ | 目标温度{DEVICE_STATE[temperature_dest]}℃)defrun():启动网关MQTT服务核心入口logger.info( 启动JetLinks网关子设备温控设备MQTT服务)# 新增程序启动时先打印所有主题详情 print_all_mqtt_topics_detail()# 1. 连接MQTT服务器clientconnect_mqtt()# 2. 订阅相关Topicsubscribe_related_topics(client)# 3. 启动5秒定时上报属性report_temperature_properties(client)# 4. 保持MQTT循环try:client.loop_forever()exceptKeyboardInterrupt:logger.info(\n 退出网关MQTT服务)exceptExceptionase:logger.error(f❌ 程序异常{str(e)},exc_infoTrue)finally:client.disconnect()if__name____main__:run()代码核心功能说明MQTT连接网关以自身设备ID为客户端ID连接MQTT服务器开启自动重连保障稳定性属性上报每5秒模拟环境温度波动主动上报子设备的环境温度和目标温度触发子设备上线写属性处理监听云平台下发的目标温度修改指令校验参数有效性后更新状态并回复执行结果日志调试启动时打印所有MQTT主题的详细信息关键操作记录日志便于问题定位。5. 测试验证5.1 属性上报测试运行代码后日志中可看到每5秒上报的温度属性与云平台显示的时间、数值完全一致图中看出时间和属性值对得上测试成功5.2 属性修改测试云平台下发目标温度修改指令后网关接收并处理上报的属性值与Web端显示一致图中看出 客户端监听到属性修改报文之后再主动上传属性 web显示一致测试成功6. 总结子设备上线核心子设备无独立MQTT连接需通过网关上报任意属性触发上线属性交互逻辑主动上报用/properties/report主题写属性用/properties/write下行/write/reply上行回复稳定性保障代码实现了MQTT自动重连、参数校验、异常捕获符合工业级设备开发规范。如有开发疑问欢迎评论区交流探讨