tcp2rtu_dtu.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. import modbus_rt
  2. import modbus_rt_defines as cst
  3. serial_name = "/dev/ttyUSB0"
  4. ip_addr = ""
  5. rm = modbus_rt.rtu(cst.MASTER)
  6. rm.set_serial(serial_name)
  7. rm.open()
  8. ts = modbus_rt.tcp()
  9. ts.set_net(ip_addr, 502, cst.SOCK_STREAM)
  10. def pre_call(evt) :
  11. slave = evt.slave
  12. function = evt.function
  13. addr = evt.addr
  14. quantity = evt.quantity
  15. if cst.READ_HOLDING_REGISTERS == function:
  16. data = rm.excuse(slave, function, addr, quantity)
  17. ts.excuse(cst.WRITE, cst.REGISTERS, addr, quantity, data)
  18. elif cst.READ_DISCRETE_INPUTS == function:
  19. data = rm.excuse(slave, function, addr, quantity)
  20. ts.excuse(cst.WRITE, cst.INPUTS, addr, quantity, data)
  21. def done_call(evt) :
  22. slave = evt.slave
  23. function = evt.function
  24. addr = evt.addr
  25. quantity = evt.quantity
  26. if cst.WRITE_SINGLE_COIL == function:
  27. data = ts.excuse(cst.READ, cst.CIOLS, addr, 1)
  28. rm.excuse(slave, function, addr, data[0])
  29. elif cst.WRITE_SINGLE_REGISTER == function:
  30. data = ts.excuse(cst.READ, cst.REGISTERS, addr, 1)
  31. rm.excuse(slave, function, addr, data[0])
  32. elif cst.WRITE_MULTIPLE_COILS == function:
  33. data = ts.excuse(cst.READ, cst.CIOLS, addr, quantity)
  34. rm.excuse(slave, function, addr, quantity, data)
  35. elif cst.WRITE_MULTIPLE_REGISTERS == function:
  36. data = ts.excuse(0, cst.REGISTERS, addr, quantity)
  37. rm.excuse(slave, function, addr, quantity, data)
  38. ts.add_block("A", 0, 20000, 10)
  39. ts.add_block("B", 1, 10000, 16)
  40. ts.add_block("C", 4, 0, 10)
  41. ts.set_strict(0)
  42. ts.set_pre_ans_callback(pre_call)
  43. ts.set_done_callback(done_call)
  44. ts.open()