net_suite_test.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import re
  2. import os
  3. import socket
  4. from threading import Thread, Event
  5. import subprocess
  6. import time
  7. from shutil import copyfile
  8. from tiny_test_fw import Utility, DUT
  9. import ttfw_idf
  10. stop_sock_listener = Event()
  11. stop_io_listener = Event()
  12. sock = None
  13. client_address = None
  14. manual_test = False
  15. def io_listener(dut1):
  16. global sock
  17. global client_address
  18. data = b''
  19. while not stop_io_listener.is_set():
  20. try:
  21. data = dut1.expect(re.compile(r"PacketOut:\[([a-fA-F0-9]+)\]"), timeout=5)
  22. except DUT.ExpectTimeout:
  23. continue
  24. if data != () and data[0] != b'':
  25. packet_data = data[0]
  26. print("Packet_data>{}<".format(packet_data))
  27. response = bytearray.fromhex(packet_data.decode())
  28. print("Sending to socket:")
  29. packet = ' '.join(format(x, '02x') for x in bytearray(response))
  30. print("Packet>{}<".format(packet))
  31. if client_address is not None:
  32. sock.sendto(response, ('127.0.0.1', 7777))
  33. def sock_listener(dut1):
  34. global sock
  35. global client_address
  36. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  37. sock.settimeout(5)
  38. server_address = '0.0.0.0'
  39. server_port = 7771
  40. server = (server_address, server_port)
  41. sock.bind(server)
  42. try:
  43. while not stop_sock_listener.is_set():
  44. try:
  45. payload, client_address = sock.recvfrom(1024)
  46. packet = ' '.join(format(x, '02x') for x in bytearray(payload))
  47. print("Received from address {}, data {}".format(client_address, packet))
  48. dut1.write(str.encode(packet))
  49. except socket.timeout:
  50. pass
  51. finally:
  52. sock.close()
  53. sock = None
  54. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  55. def lwip_test_suite(env, extra_data):
  56. global stop_io_listener
  57. global stop_sock_listener
  58. """
  59. steps: |
  60. 1. Rebuilds test suite with esp32_netsuite.ttcn
  61. 2. Starts listeners on stdout and socket
  62. 3. Execute ttcn3 test suite
  63. 4. Collect result from ttcn3
  64. """
  65. dut1 = env.get_dut("net_suite", "examples/system/network_tests", dut_class=ttfw_idf.ESP32DUT)
  66. # check and log bin size
  67. binary_file = os.path.join(dut1.app.binary_path, "net_suite.bin")
  68. bin_size = os.path.getsize(binary_file)
  69. ttfw_idf.log_performance("net_suite", "{}KB".format(bin_size // 1024))
  70. ttfw_idf.check_performance("net_suite", bin_size // 1024, dut1.TARGET)
  71. dut1.start_app()
  72. thread1 = Thread(target=sock_listener, args=(dut1, ))
  73. thread2 = Thread(target=io_listener, args=(dut1, ))
  74. if not manual_test:
  75. # Variables refering to esp32 ttcn test suite
  76. TTCN_SRC = 'esp32_netsuite.ttcn'
  77. TTCN_CFG = 'esp32_netsuite.cfg'
  78. # System Paths
  79. netsuite_path = os.getenv("NETSUITE_PATH")
  80. netsuite_src_path = os.path.join(netsuite_path, "src")
  81. test_dir = os.path.dirname(os.path.realpath(__file__))
  82. # Building the suite
  83. print("Rebuilding the test suite")
  84. print("-------------------------")
  85. # copy esp32 specific files to ttcn net-suite dir
  86. copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC))
  87. copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG))
  88. proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'],
  89. cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  90. output = proc.stdout.read()
  91. print("Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)")
  92. print(output)
  93. proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'],
  94. cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  95. print("Note: This time all dependencies shall be generated -- multijob make shall pass")
  96. output = proc.stdout.read()
  97. print(output)
  98. # Executing the test suite
  99. thread1.start()
  100. thread2.start()
  101. time.sleep(2)
  102. print("Executing the test suite")
  103. print("------------------------")
  104. proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)],
  105. stdout=subprocess.PIPE)
  106. output = proc.stdout.read()
  107. print(output)
  108. print("Collecting results")
  109. print("------------------")
  110. verdict_stats = re.search('(Verdict statistics:.*)', output)
  111. if verdict_stats:
  112. verdict_stats = verdict_stats.group(1)
  113. else:
  114. verdict_stats = b""
  115. verdict = re.search('Overall verdict: pass', output)
  116. if verdict:
  117. print("Test passed!")
  118. Utility.console_log(verdict_stats, "green")
  119. else:
  120. Utility.console_log(verdict_stats, "red")
  121. raise ValueError('Test failed with: {}'.format(verdict_stats))
  122. else:
  123. try:
  124. # Executing the test suite
  125. thread1.start()
  126. thread2.start()
  127. time.sleep(2)
  128. while True:
  129. time.sleep(0.5)
  130. except KeyboardInterrupt:
  131. pass
  132. print("Executing done, waiting for tests to finish")
  133. print("-------------------------------------------")
  134. stop_io_listener.set()
  135. stop_sock_listener.set()
  136. thread1.join()
  137. thread2.join()
  138. if __name__ == '__main__':
  139. print("Manual execution, please build and start ttcn in a separate console")
  140. manual_test = True
  141. lwip_test_suite()