使用pymodbus库进行modbus tcp通信
使用python解决工业通信问题是一个非常好的选择,python具有丰富的生态,可以轻松解决工业通信的各种问题。
本篇主要介绍使用pymodbus库进行modbus tcp仿真,实现pc端读取plc或工业设备modbus变量。
安装pymodbus:
1 | pip install - U pymodbus |
创建modbus tcp server
这里我们先创建一个虚拟的modbus设备,如果你手里有一个plc或者工业设备,可以直接跳过本节。
- modbus_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | ''' * @Author: liuzhao * @Last Modified time: 2022-10-05 09:56:13 ''' from pymodbus.server.sync import ( StartTcpServer, ) from pymodbus.datastore import ( ModbusSequentialDataBlock, ModbusServerContext, ModbusSlaveContext, ) from pymodbus.version import version datablock = ModbusSequentialDataBlock.create() context = ModbusSlaveContext( di = datablock, co = datablock, hr = datablock, ir = datablock, ) single = True # Build data storage store = ModbusServerContext(slaves = context, single = single) if __name__ = = '__main__' : address = ( "0.0.0.0" , 503 ) StartTcpServer( context = store, # Data storage address = address, # listen address allow_reuse_address = True , # allow the reuse of an address ) |
直接运行该脚本,就可以在本机的503端口创建一台modbus设备了,具体实现暂不深追,我们学习的重点是客户端对modbus变量的读写。
读写modbus变量
modbus变量类型以及地址
Object type | Access | Size | Address |
---|---|---|---|
Coil | Read-write | 1 bit | 00001 – 09999 |
Discrete input | Read-only | 1 bit | 10001 – 19999 |
Input register | Read-only | 16 bits | 30001 – 39999 |
Holding register | Read-write | 16 bits | 40001 – 49999 |
coil是线圈,Discrete input是数字量输入,Input register是模拟量输入,Holding register是保持寄存器。一般地址范围是0-65535
读取常规变量
读写线圈 | 读取输入变量 | 读写保持寄存器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | from pymodbus.client.sync import ModbusTcpClient from pymodbus.bit_read_message import ReadCoilsResponse from pymodbus.register_read_message import ReadInputRegistersResponse from pymodbus.exceptions import ConnectionException # 连接失败,用于异常处理 host = '127.0.0.1' port = 503 client = ModbusTcpClient(host,port) # 写入线圈 client.write_coil( 1 , True ) client.write_coil( 2 , False ) client.write_coil( 3 , True ) # 读取线圈 注意对于离散量的读取,第二个参数cout是有坑的,必须为8的倍数个 result:ReadCoilsResponse = client.read_coils(address = 1 ,cout = 8 ) # 从地址1开始读,读取8个线圈,一次读8的倍数个线圈,不设置为8的倍数可能会出现问题 print (result.isError()) # 不建议使用 print (result.getBit( 7 )) # 这里的参数address不是plc里的地址,而是python列表的address, print ( 'read_coils ' ) # 建议使用 print (result.bits) # 打印读取结果,一共8位 # 读取其中的位 print ( result.bits[ 0 ], result.bits[ 1 ], result.bits[ 2 ] ) # 相当于result.getBit(0) # 读取数字输入 result = client.read_discrete_inputs(address = 10001 ,count = 8 ) # 从10001开始读,读取8位 print (result.bits) # 读取模拟输入寄存器 input_register_result:ReadInputRegistersResponse = client.read_input_registers( 1 ,count = 8 ) # print(f'is_error:{input_register_result.isError()}') print ( 'read_input_registers ' ) print (input_register_result.registers) print (input_register_result.getRegister( 0 )) # 读写保持寄存器 client.write_register(address = 40001 ,value = 100 ) result:ReadInputRegistersResponse = client.read_holding_registers(address = 40001 ,count = 1 ) print ( 'read_holding_registers ' ) print (result.registers) # 关闭连接 client.close() |
读取复杂变量
字符串、浮点数、负数等
这里需要注意modbus设备的存储结构是低位低字节还是低位高字节,也就是设备内存的字节、字的排列顺序。
根据不同的设备,对照下表调整正确的组合方式。
Word Order | Byte order | Word1 | Word2 |
---|---|---|---|
Big | Big | 0x1234 | 0x5678 |
Big | Little | 0x3412 | 0x7856 |
Little | Big | 0x5678 | 0x1234 |
Little | Little | 0x7856 | 0x3412 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | # 复杂数据类型 from collections import OrderedDict import logging from pymodbus.client.sync import ModbusTcpClient as ModbusClient from pymodbus.constants import Endian from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder ORDER_DICT = {" ": " BIG"} def run_binary_payload_client(host: str ,port: int ): for word_endian, byte_endian in ( (Endian.Big, Endian.Big), (Endian.Big, Endian.Little), (Endian.Little, Endian.Big), (Endian.Little, Endian.Little), ): print ( "-" * 60 ) print (f "Word Order: {ORDER_DICT[word_endian]}" ) print (f "Byte Order: {ORDER_DICT[byte_endian]}" ) print () builder = BinaryPayloadBuilder( wordorder = word_endian, byteorder = byte_endian, ) # 写入的变量 my_string = "abcd-efgh123345765432" builder.add_string(my_string) builder.add_bits([ 0 , 1 , 0 , 1 , 1 , 0 , 1 , 0 ]) builder.add_8bit_int( - 0x12 ) builder.add_8bit_uint( 0x12 ) builder.add_16bit_int( - 0x5678 ) builder.add_16bit_uint( 0x1234 ) builder.add_32bit_int( - 0x1234 ) builder.add_32bit_uint( 0x12345678 ) builder.add_16bit_float( 12.34 ) builder.add_16bit_float( - 12.34 ) builder.add_32bit_float( 22.34 ) builder.add_32bit_float( - 22.34 ) builder.add_64bit_int( - 0xDEADBEEF ) builder.add_64bit_uint( 0x12345678DEADBEEF ) builder.add_64bit_uint( 0x12345678DEADBEEF ) builder.add_64bit_float( 123.45 ) builder.add_64bit_float( - 123.45 ) registers = builder.to_registers() print ( "Writing Registers:" ) print (registers) print ( "n" ) payload = builder.build() address = 40001 # 从40001开始写入 # We can write registers client.write_registers(address, registers, unit = 1 ) # 写入 # 读取复杂变量 print ( "Reading Registers:" ) address = 40001 count = len (payload) print (f "payload_len {count}" ) result = client.read_holding_registers(address, count, slave = 1 ) print (result.registers) print ( "n" ) decoder = BinaryPayloadDecoder.fromRegisters( result.registers, byteorder = byte_endian, wordorder = word_endian ) # Make sure word/byte order is consistent between BinaryPayloadBuilder and BinaryPayloadDecoder assert ( decoder._byteorder = = builder._byteorder # pylint: disable=protected-access ) # nosec assert ( decoder._wordorder = = builder._wordorder # pylint: disable=protected-access ) # nosec decoded = OrderedDict( [ ( "string" , decoder.decode_string( len (my_string))), ( "bits" , decoder.decode_bits()), ( "8int" , decoder.decode_8bit_int()), ( "8uint" , decoder.decode_8bit_uint()), ( "16int" , decoder.decode_16bit_int()), ( "16uint" , decoder.decode_16bit_uint()), ( "32int" , decoder.decode_32bit_int()), ( "32uint" , decoder.decode_32bit_uint()), ( "16float" , decoder.decode_16bit_float()), ( "16float2" , decoder.decode_16bit_float()), ( "32float" , decoder.decode_32bit_float()), ( "32float2" , decoder.decode_32bit_float()), ( "64int" , decoder.decode_64bit_int()), ( "64uint" , decoder.decode_64bit_uint()), ( "ignore" , decoder.skip_bytes( 8 )), ( "64float" , decoder.decode_64bit_float()), ( "64float2" , decoder.decode_64bit_float()), ] ) print ( "Decoded Data" ) for name, value in iter (decoded.items()): print ( "%st" % name, # pylint: disable=consider-using-f-string hex (value) if isinstance (value, int ) else value, ) print ( "n" ) # 关闭连接 client.close() if __name__ = = "__main__" : run_binary_payload_client( "127.0.0.1" , 503 ) |
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持IT俱乐部。