| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- import _modbus_rt
- import modbus_rt_defines as cst
- LITTLE_ENDIAL_SWAP = 0
- BIG_ENDIAL_SWAP = 1
- LITTLE_ENDIAL = 2
- BIG_ENDIAL = 3
- SLAVE = 0
- MASTER = 1
- SOCK_STREAM = 1
- SOCK_DGRAM = 2
- class data_trans(_modbus_rt._data_trans):
- def reg2reg(self, val: int):
- return self._reg2reg(val)
- def regs2regs(self, val: list):
- return self._regs2regs(val)
- def regs2bytes(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- if type(val) != list:
- print("para only for type list")
- return
- return self._regs2bytes(val, mode)
- def regs2str(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._regs2str(val, mode)
- def regs2signed(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._regs2signed(val, mode)
- def regs2int(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._regs2uint(val, mode)
- def regs2uint(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._regs2int(val, mode)
- def regs2long(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._regs2long(val, mode)
- def regs2float(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._regs2float(val, mode)
- def regs2double(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._regs2double(val, mode)
- def bytes2regs(self, val: any, mode = LITTLE_ENDIAL_SWAP):
- return self._bytes2regs(val, mode)
- def str2regs(self, val: str, mode = LITTLE_ENDIAL_SWAP):
- return self._str2regs(val, mode)
- def signed2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._signed2regs(val, mode)
- def int2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._int2regs(val, mode)
- def uint2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._uint2regs(val, mode)
- def long2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._long2regs(val, mode)
- def float2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._float2regs(val, mode)
- def double2regs(self, val: list, mode = LITTLE_ENDIAL_SWAP):
- return self._double2regs(val, mode)
- class rtu(_modbus_rt._rtu):
- mode = SLAVE
- ascii_flag = 0
- def __init__(self, mode = SLAVE):
- return self._init(mode)
- def set_serial(self, devname: str, baudrate = 115200, bytesize = 8, parity = 'N', stopbits = 1, xonxoff = 0):
- return self._set_serial(devname, baudrate, bytesize, parity, stopbits, xonxoff)
- def set_over_type(self, over_type: int):
- return self._set_over_type(over_type)
- def set_net(self, ip = '', port = 502, type = SOCK_STREAM):
- return self._set_net(ip, port, type)
- def set_ip(self, ip: str):
- return self._set_ip(ip)
- def set_port(self, port: int):
- return self._set_port(port)
- def set_type(self, type: int):
- return self._set_type(type)
- def set_p2p(self, p2p_flag: int):
- return self._set_p2p(p2p_flag)
- def open(self):
- return self._open()
- def isopen(self):
- return self._isopen()
- def close(self):
- return self._close()
- # 该函数仅对从机有效
- def set_addr(self, addr: int):
- return self._slave_set_addr(addr)
- # 该函数仅对从机有效
- def set_strict(self, strict: int):
- return self._slave_set_strict(strict)
- # 该函数仅对从机有效
- def add_block(self, name: str, type: int, addr: int, nums: int):
- return self._slave_add_block(name, type, addr, nums)
- # 该函数仅对从机有效
- def regs_binding(self, regs: bytes, type: int, addr: int, nums: int):
- return self._slave_regs_binding(regs, type, addr, nums)
- # 该函数仅对从机有效
- def set_pre_ans_callback(self, cb):
- return self._slave_set_pre_ans_callback(cb)
- # 该函数仅对从机有效
- def set_done_callback(self, cb):
- return self._slave_set_done_callback(cb)
- # 该函数仅对从机有效
- def set_dev_binding(self, flag: int):
- return self._slave_set_dev_binding(flag)
- # 该函数仅对主机有效
- def set_server(self, saddr: str,sport: int):
- return self._master_set_server(saddr,sport)
- # 该函数仅对主机有效
- def get_saddr(self):
- return self._master_get_saddr()
- def excuse(self, dir_slave: int, type_function: int, addr: int, *val):
- if self.mode == SLAVE:
- if dir_slave == 0 :
- return self._slave_read_regs(type_function, addr, *val)
- elif dir_slave == 1 :
- return self._slave_write_regs(type_function, addr, *val)
- else :
- return
- elif self.mode == MASTER:
- if type_function >= cst.READ_COILS and type_function <= cst.READ_INPUT_REGISTERS:
- return self._master_read_list(dir_slave,type_function, addr, *val)
- elif type_function >= cst.WRITE_SINGLE_COIL and type_function <= cst.WRITE_SINGLE_REGISTER:
- return self._master_write_int(dir_slave,type_function, addr, *val)
- elif type_function >= cst.WRITE_MULTIPLE_COILS and type_function <= cst.WRITE_MULTIPLE_REGISTERS:
- return self._master_write_list(dir_slave,type_function, addr, *val)
- else :
- return
- else :
- return
- # 该函数仅对主机有效
- def download(self, slave: int, file_dev: str, file_master: str):
- return self._master_download(slave, file_dev, file_master)
- # 该函数仅对主机有效
- def upload(self, slave: int, file_dev: str, file_master: str):
- return self._master_upload(slave, file_dev, file_master)
- class ascii(rtu):
- mode = SLAVE
- ascii_flag = 0
- class tcp(_modbus_rt._tcp):
- mode = SLAVE
- def __init__(self, mode = SLAVE):
- return self._init(mode)
- def set_net(self, ip = '', port = 502, type = SOCK_STREAM):
- return self._set_net(ip, port, type)
- def set_ip(self, ip: str):
- return self._set_ip(ip)
- def set_port(self, port: int):
- return self._set_port(port)
- def set_type(self, type: int):
- return self._set_type(type)
- def set_p2p(self, p2p_flag: int):
- return self._set_p2p(p2p_flag)
- def open(self):
- return self._open()
- def isopen(self):
- return self._isopen()
- def close(self):
- return self._close()
- # 该函数仅对从机有效
- def set_addr(self, addr: int):
- return self._slave_set_addr(addr)
- # 该函数仅对从机有效
- def set_strict(self, strict: int):
- return self._slave_set_strict(strict)
- # 该函数仅对从机有效
- def add_block(self, name: str, type: int, addr: int, nums: int):
- return self._slave_add_block(name, type, addr, nums)
- # 该函数仅对从机有效
- def regs_binding(self, regs: bytes, type: int, addr: int, nums: int):
- return self._slave_regs_binding(regs, type, addr, nums)
- # 该函数仅对从机有效
- def set_pre_ans_callback(self, cb):
- return self._slave_set_pre_ans_callback(cb)
- # 该函数仅对从机有效
- def set_done_callback(self, cb):
- return self._slave_set_done_callback(cb)
- # 该函数仅对从机有效
- def set_dev_binding(self, flag: int):
- return self._slave_set_dev_binding(flag)
- # 该函数仅对主机有效
- def set_server(self, saddr: str,sport: int):
- return self._master_set_server(saddr,sport)
- # 该函数仅对主机有效
- def get_saddr(self):
- return self._master_get_saddr()
- def excuse(self, dir_slave: int, type_function: int, addr: int, *val):
- if self.mode == SLAVE:
- if dir_slave == 0 :
- return self._slave_read_regs(type_function, addr, *val)
- elif dir_slave == 1 :
- return self._slave_write_regs(type_function, addr, *val)
- else :
- return
- elif self.mode == MASTER:
- if type_function >= cst.READ_COILS and type_function <= cst.READ_INPUT_REGISTERS:
- return self._master_read_list(dir_slave,type_function, addr, *val)
- elif type_function >= cst.WRITE_SINGLE_COIL and type_function <= cst.WRITE_SINGLE_REGISTER:
- return self._master_write_int(dir_slave,type_function, addr, *val)
- elif type_function >= cst.WRITE_MULTIPLE_COILS and type_function <= cst.WRITE_MULTIPLE_REGISTERS:
- return self._master_write_list(dir_slave,type_function, addr, *val)
- else :
- return
- else :
- return
- # 该函数仅对主机有效
- def download(self, slave: int, file_dev: str, file_master: str):
- return self._master_download(slave, file_dev, file_master)
- # 该函数仅对主机有效
- def upload(self, slave: int, file_dev: str, file_master: str):
- return self._master_upload(slave, file_dev, file_master)
|