| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- import re
- import os
- import socket
- from threading import Thread, Event
- import subprocess
- import time
- from shutil import copyfile
- from tiny_test_fw import Utility, DUT
- import ttfw_idf
- stop_sock_listener = Event()
- stop_io_listener = Event()
- sock = None
- client_address = None
- manual_test = False
- def io_listener(dut1):
- global sock
- global client_address
- data = b''
- while not stop_io_listener.is_set():
- try:
- data = dut1.expect(re.compile(r"PacketOut:\[([a-fA-F0-9]+)\]"), timeout=5)
- except DUT.ExpectTimeout:
- continue
- if data != () and data[0] != b'':
- packet_data = data[0]
- print("Packet_data>{}<".format(packet_data))
- response = bytearray.fromhex(packet_data.decode())
- print("Sending to socket:")
- packet = ' '.join(format(x, '02x') for x in bytearray(response))
- print("Packet>{}<".format(packet))
- if client_address is not None:
- sock.sendto(response, ('127.0.0.1', 7777))
- def sock_listener(dut1):
- global sock
- global client_address
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- sock.settimeout(5)
- server_address = '0.0.0.0'
- server_port = 7771
- server = (server_address, server_port)
- sock.bind(server)
- try:
- while not stop_sock_listener.is_set():
- try:
- payload, client_address = sock.recvfrom(1024)
- packet = ' '.join(format(x, '02x') for x in bytearray(payload))
- print("Received from address {}, data {}".format(client_address, packet))
- dut1.write(str.encode(packet))
- except socket.timeout:
- pass
- finally:
- sock.close()
- sock = None
- @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
- def lwip_test_suite(env, extra_data):
- global stop_io_listener
- global stop_sock_listener
- """
- steps: |
- 1. Rebuilds test suite with esp32_netsuite.ttcn
- 2. Starts listeners on stdout and socket
- 3. Execute ttcn3 test suite
- 4. Collect result from ttcn3
- """
- dut1 = env.get_dut("net_suite", "examples/system/network_tests", dut_class=ttfw_idf.ESP32DUT)
- # check and log bin size
- binary_file = os.path.join(dut1.app.binary_path, "net_suite.bin")
- bin_size = os.path.getsize(binary_file)
- ttfw_idf.log_performance("net_suite", "{}KB".format(bin_size // 1024))
- ttfw_idf.check_performance("net_suite", bin_size // 1024, dut1.TARGET)
- dut1.start_app()
- thread1 = Thread(target=sock_listener, args=(dut1, ))
- thread2 = Thread(target=io_listener, args=(dut1, ))
- if not manual_test:
- # Variables refering to esp32 ttcn test suite
- TTCN_SRC = 'esp32_netsuite.ttcn'
- TTCN_CFG = 'esp32_netsuite.cfg'
- # System Paths
- netsuite_path = os.getenv("NETSUITE_PATH")
- netsuite_src_path = os.path.join(netsuite_path, "src")
- test_dir = os.path.dirname(os.path.realpath(__file__))
- # Building the suite
- print("Rebuilding the test suite")
- print("-------------------------")
- # copy esp32 specific files to ttcn net-suite dir
- copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC))
- copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG))
- proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'],
- cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- output = proc.stdout.read()
- print("Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)")
- print(output)
- proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'],
- cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- print("Note: This time all dependencies shall be generated -- multijob make shall pass")
- output = proc.stdout.read()
- print(output)
- # Executing the test suite
- thread1.start()
- thread2.start()
- time.sleep(2)
- print("Executing the test suite")
- print("------------------------")
- proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)],
- stdout=subprocess.PIPE)
- output = proc.stdout.read()
- print(output)
- print("Collecting results")
- print("------------------")
- verdict_stats = re.search('(Verdict statistics:.*)', output)
- if verdict_stats:
- verdict_stats = verdict_stats.group(1)
- else:
- verdict_stats = b""
- verdict = re.search('Overall verdict: pass', output)
- if verdict:
- print("Test passed!")
- Utility.console_log(verdict_stats, "green")
- else:
- Utility.console_log(verdict_stats, "red")
- raise ValueError('Test failed with: {}'.format(verdict_stats))
- else:
- try:
- # Executing the test suite
- thread1.start()
- thread2.start()
- time.sleep(2)
- while True:
- time.sleep(0.5)
- except KeyboardInterrupt:
- pass
- print("Executing done, waiting for tests to finish")
- print("-------------------------------------------")
- stop_io_listener.set()
- stop_sock_listener.set()
- thread1.join()
- thread2.join()
- if __name__ == '__main__':
- print("Manual execution, please build and start ttcn in a separate console")
- manual_test = True
- lwip_test_suite()
|