用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码与数据帧解析)

发布时间:2026/6/7 19:22:10

用Python和pymodbus库模拟Modbus RTU主从通信(附完整代码与数据帧解析) 用Python和pymodbus库模拟Modbus RTU主从通信附完整代码与数据帧解析在工业自动化领域Modbus RTU协议因其简单可靠的特点成为连接PLC、传感器和上位机系统的通用语言。本文将带您从零开始使用Python的pymodbus库构建完整的Modbus RTU通信测试环境通过代码实例深入解析协议底层的数据帧结构。1. 环境搭建与基础配置1.1 虚拟串口工具配置在开始编码前我们需要模拟物理串口环境。Windows平台推荐使用com0com虚拟串口工具Linux/Mac可使用socat命令创建虚拟串口对# Linux/Mac虚拟串口创建 socat -d -d pty,raw,echo0 pty,raw,echo0安装Python环境依赖pip install pymodbus3.0.0 serial注意pymodbus 3.x版本重构了异步IO支持本文示例基于同步API以保证兼容性1.2 协议基础参数设置Modbus RTU的关键参数需要主从设备严格一致参数典型值说明波特率19200常见工业设备标准速率数据位8固定配置停止位1常见配置校验位None/Even根据设备要求设置响应超时1秒等待从站响应的最长时间2. 构建Modbus从站模拟器2.1 初始化从站服务from pymodbus.server.sync import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext def run_slave(): store ModbusSlaveContext( diModbusSequentialDataBlock(0, [0]*100), # 离散输入 coModbusSequentialDataBlock(0, [0]*100), # 线圈 hrModbusSequentialDataBlock(0, [0]*100), # 保持寄存器 irModbusSequentialDataBlock(0, [0]*100)) # 输入寄存器 context ModbusServerContext(slavesstore, singleTrue) StartSerialServer( context, port/dev/ttyS1, # 虚拟串口设备 baudrate19200, timeout1)2.2 数据帧结构验证当主站发送读取保持寄存器请求功能码0x03时从站响应的原始数据帧可通过Wireshark捕获分析响应帧示例 01 03 02 00 0A 45 CD01从站地址03功能码02数据字节数00 0A寄存器值十进制1045 CDCRC校验码3. 实现Modbus主站功能3.1 同步读取操作实例from pymodbus.client.sync import ModbusSerialClient client ModbusSerialClient( methodrtu, port/dev/ttyS0, baudrate19200, timeout1) # 读取保持寄存器 result client.read_holding_registers( address0, # 起始地址 count5, # 寄存器数量 unit1) # 从站地址 print(寄存器值:, result.registers)3.2 数据帧构造原理底层请求帧的构建过程def build_read_frame(slave_id, function_code, address, count): # 构造PDU pdu bytes([function_code]) \ address.to_bytes(2, big) \ count.to_bytes(2, big) # 添加CRC校验 crc compute_crc(bytes([slave_id]) pdu) return bytes([slave_id]) pdu crc.to_bytes(2, little) # 示例构造读取保持寄存器请求 frame build_read_frame( slave_id1, function_code0x03, address0, count5) print(原始请求帧:, frame.hex())4. 高级功能实现与调试4.1 批量写入操作# 写入多个保持寄存器功能码0x10 values [10, 20, 30, 40] result client.write_registers( address0, valuesvalues, unit1) if result.isError(): print(写入失败:, result) else: print(写入成功)对应的请求帧解析01 10 00 00 00 04 08 00 0A 00 14 00 1E 00 28 42 1F00 00起始地址00 04寄存器数量08数据字节长度后续8字节为4个寄存器的值4.2 异常处理机制Modbus协议定义的异常响应格式字段示例值说明从站地址01异常功能码83原功能码0x80异常码02非法数据地址捕获异常响应的代码实现try: response client.read_coils(0, 10, unit1) if response.isError(): print(Modbus异常码:, response.exception_code) except Exception as e: print(通信错误:, str(e))5. 协议分析工具链5.1 使用Wireshark解码Modbus RTU配置步骤安装Wireshark并启用串口捕获添加Modbus RTU解析器过滤语法modbus || modbusadu典型数据包分析Frame 1 (Request): 0000 01 03 00 00 00 02 44 09 ......D. Frame 2 (Response): 0000 01 03 04 00 0a 00 14 f5 33 ......ó35.2 自定义数据监控工具import matplotlib.pyplot as plt from collections import deque class ModbusMonitor: def __init__(self, max_points50): self.data deque(maxlenmax_points) def update(self, values): self.data.extend(values) plt.clf() plt.plot(self.data) plt.pause(0.01) # 使用示例 monitor ModbusMonitor() while True: result client.read_holding_registers(0, 5, unit1) monitor.update(result.registers)6. 性能优化实践6.1 多寄存器读取优化# 批量读取策略对比 single_read_time timeit.timeit( lambda: client.read_holding_registers(0, 1, unit1), number100) batch_read_time timeit.timeit( lambda: client.read_holding_registers(0, 10, unit1), number100) print(f单点读取耗时: {single_read_time:.3f}s) print(f批量读取耗时: {batch_read_time:.3f}s)6.2 连接池管理from pymodbus.client.sync import ModbusConnectionPool pool ModbusConnectionPool( client_classModbusSerialClient, max_size5, **client_params) def safe_read(address): with pool.acquire() as conn: return conn.read_holding_registers(address, 1)在实际项目中通过合理设置超时时间和重试机制可以将通信成功率提升至99.9%以上。一个常见的经验是对于19200波特率的网络单个请求的超时时间不应低于300ms。

相关新闻