send_testcase.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. import sys
  2. import socket
  3. import struct
  4. if len(sys.argv) != 3:
  5. print("python {} IP TESTCASE_PATH".format(sys.argv[0]))
  6. sys.exit(1)
  7. HOST_IP = sys.argv[1]
  8. HOST_PORT = 44818
  9. TESTCASE_PATH = sys.argv[2]
  10. ENIP_SESSION_CONTEXT = b"\x92\x83J\x0b=\x9e\x0cW"
  11. ENIP_INIT_SESSION_PACKET = b"e\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00" + ENIP_SESSION_CONTEXT + b"\x00\x00\x00\x00\x01\x00\x00\x00"
  12. print("[-] Connecting to {}:{}".format(HOST_IP, HOST_PORT))
  13. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  14. s.connect((HOST_IP, HOST_PORT))
  15. print("[-] Init ENIP session")
  16. s.sendall(ENIP_INIT_SESSION_PACKET)
  17. enip_session = s.recv(1024)
  18. session_handle = enip_session[4:8]
  19. print("[-] Got ENIP Session Handle: {}".format(struct.unpack("<I", session_handle)[0]))
  20. print("[-] Reading testcase from: '{}'".format(TESTCASE_PATH))
  21. with open(TESTCASE_PATH, "rb") as f:
  22. testcase_data = f.read()
  23. print("[-] Patching sender context and session handle")
  24. testcase = testcase_data[:4] # command, len
  25. testcase += session_handle # session handle
  26. testcase += testcase_data[8:12] # status
  27. testcase += ENIP_SESSION_CONTEXT # session context
  28. testcase += testcase_data[20:] # options and payload
  29. print("[-] Sending testcase of {} bytes".format(len(testcase)))
  30. s.send(testcase)
  31. s.close()