使用pymodbus库进行modbus tcp通信
使用python解决工业通信问题是一个非常好的选择,python具有丰富的生态,可以轻松解决工业通信的各种问题。
本篇主要介绍使用pymodbus库进行modbus tcp仿真,实现pc端读取plc或工业设备modbus变量。
安装pymodbus:
pip install -U pymodbus
创建modbus tcp server
这里我们先创建一个虚拟的modbus设备,如果你手里有一个plc或者工业设备,可以直接跳过本节。
- modbus_server.py
''' * @Author: liuzhao * @Last Modified time: 2022-10-05 09:56:13 ''' from pymodbus.server.sync import ( StartTcpServer, ) from pymodbus.datastore import ( ModbusSequentialDataBlock, ModbusServerContext, ModbusSlaveContext, ) from pymodbus.version import version datablock = ModbusSequentialDataBlock.create() context = ModbusSlaveContext( di=datablock, co=datablock, hr=datablock, ir=datablock, ) single = True # Build data storage store = ModbusServerContext(slaves=context, single=single) if __name__ == '__main__': address = ("0.0.0.0", 503) StartTcpServer( context=store, # Data storage address=address, # listen address allow_reuse_address=True, # allow the reuse of an address )
直接运行该脚本,就可以在本机的503端口创建一台modbus设备了,具体实现暂不深追,我们学习的重点是客户端对modbus变量的读写。
读写modbus变量
modbus变量类型以及地址
Object type | Access | Size | Address |
---|---|---|---|
Coil | Read-write | 1 bit | 00001 – 09999 |
Discrete input | Read-only | 1 bit | 10001 – 19999 |
Input register | Read-only | 16 bits | 30001 – 39999 |
Holding register | Read-write | 16 bits | 40001 – 49999 |
coil是线圈,Discrete input是数字量输入,Input register是模拟量输入,Holding register是保持寄存器。一般地址范围是0-65535
读取常规变量
读写线圈 | 读取输入变量 | 读写保持寄存器
from pymodbus.client.sync import ModbusTcpClient from pymodbus.bit_read_message import ReadCoilsResponse from pymodbus.register_read_message import ReadInputRegistersResponse from pymodbus.exceptions import ConnectionException # 连接失败,用于异常处理 host = '127.0.0.1' port = 503 client = ModbusTcpClient(host,port) # 写入线圈 client.write_coil(1, True) client.write_coil(2, False) client.write_coil(3, True) # 读取线圈 注意对于离散量的读取,第二个参数cout是有坑的,必须为8的倍数个 result:ReadCoilsResponse = client.read_coils(address=1,cout=8) # 从地址1开始读,读取8个线圈,一次读8的倍数个线圈,不设置为8的倍数可能会出现问题 print(result.isError()) # 不建议使用 print(result.getBit(7)) # 这里的参数address不是plc里的地址,而是python列表的address, print('read_coils ') # 建议使用 print(result.bits) # 打印读取结果,一共8位 # 读取其中的位 print( result.bits[0], result.bits[1], result.bits[2] ) # 相当于result.getBit(0) # 读取数字输入 result = client.read_discrete_inputs(address=10001,count=8) # 从10001开始读,读取8位 print(result.bits) # 读取模拟输入寄存器 input_register_result:ReadInputRegistersResponse = client.read_input_registers(1,count=8) # print(f'is_error:{input_register_result.isError()}') print('read_input_registers ') print(input_register_result.registers) print(input_register_result.getRegister(0)) # 读写保持寄存器 client.write_register(address=40001,value=100) result:ReadInputRegistersResponse = client.read_holding_registers(address=40001,count=1) print('read_holding_registers ') print(result.registers) # 关闭连接 client.close()
读取复杂变量
字符串、浮点数、负数等
这里需要注意modbus设备的存储结构是低位低字节还是低位高字节,也就是设备内存的字节、字的排列顺序。
根据不同的设备,对照下表调整正确的组合方式。
Word Order | Byte order | Word1 | Word2 |
---|---|---|---|
Big | Big | 0x1234 | 0x5678 |
Big | Little | 0x3412 | 0x7856 |
Little | Big | 0x5678 | 0x1234 |
Little | Little | 0x7856 | 0x3412 |
# 复杂数据类型 from collections import OrderedDict import logging from pymodbus.client.sync import ModbusTcpClient as ModbusClient from pymodbus.constants import Endian from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder ORDER_DICT = {"": "BIG"} def run_binary_payload_client(host:str,port:int): for word_endian, byte_endian in ( (Endian.Big, Endian.Big), (Endian.Big, Endian.Little), (Endian.Little, Endian.Big), (Endian.Little, Endian.Little), ): print("-" * 60) print(f"Word Order: {ORDER_DICT[word_endian]}") print(f"Byte Order: {ORDER_DICT[byte_endian]}") print() builder = BinaryPayloadBuilder( wordorder=word_endian, byteorder=byte_endian, ) # 写入的变量 my_string = "abcd-efgh123345765432" builder.add_string(my_string) builder.add_bits([0, 1, 0, 1, 1, 0, 1, 0]) builder.add_8bit_int(-0x12) builder.add_8bit_uint(0x12) builder.add_16bit_int(-0x5678) builder.add_16bit_uint(0x1234) builder.add_32bit_int(-0x1234) builder.add_32bit_uint(0x12345678) builder.add_16bit_float(12.34) builder.add_16bit_float(-12.34) builder.add_32bit_float(22.34) builder.add_32bit_float(-22.34) builder.add_64bit_int(-0xDEADBEEF) builder.add_64bit_uint(0x12345678DEADBEEF) builder.add_64bit_uint(0x12345678DEADBEEF) builder.add_64bit_float(123.45) builder.add_64bit_float(-123.45) registers = builder.to_registers() print("Writing Registers:") print(registers) print("n") payload = builder.build() address = 40001 # 从40001开始写入 # We can write registers client.write_registers(address, registers, unit=1) # 写入 # 读取复杂变量 print("Reading Registers:") address = 40001 count = len(payload) print(f"payload_len {count}") result = client.read_holding_registers(address, count, slave=1) print(result.registers) print("n") decoder = BinaryPayloadDecoder.fromRegisters( result.registers, byteorder=byte_endian, wordorder=word_endian ) # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder assert ( decoder._byteorder == builder._byteorder # pylint: disable=protected-access ) # nosec assert ( decoder._wordorder == builder._wordorder # pylint: disable=protected-access ) # nosec decoded = OrderedDict( [ ("string", decoder.decode_string(len(my_string))), ("bits", decoder.decode_bits()), ("8int", decoder.decode_8bit_int()), ("8uint", decoder.decode_8bit_uint()), ("16int", decoder.decode_16bit_int()), ("16uint", decoder.decode_16bit_uint()), ("32int", decoder.decode_32bit_int()), ("32uint", decoder.decode_32bit_uint()), ("16float", decoder.decode_16bit_float()), ("16float2", decoder.decode_16bit_float()), ("32float", decoder.decode_32bit_float()), ("32float2", decoder.decode_32bit_float()), ("64int", decoder.decode_64bit_int()), ("64uint", decoder.decode_64bit_uint()), ("ignore", decoder.skip_bytes(8)), ("64float", decoder.decode_64bit_float()), ("64float2", decoder.decode_64bit_float()), ] ) print("Decoded Data") for name, value in iter(decoded.items()): print( "%st" % name, # pylint: disable=consider-using-f-string hex(value) if isinstance(value, int) else value, ) print("n") # 关闭连接 client.close() if __name__ == "__main__": run_binary_payload_client("127.0.0.1", 503)
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持IT俱乐部。