app_test.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. from __future__ import print_function, unicode_literals
  2. import re
  3. import socket
  4. import subprocess
  5. import time
  6. from threading import Event, Thread
  7. import netifaces
  8. import ttfw_idf
  9. def run_server(server_stop, port, server_ip, client_ip):
  10. print('Starting PPP server on port: {}'.format(port))
  11. try:
  12. arg_list = ['pppd', port, '115200', '{}:{}'.format(server_ip, client_ip), 'modem', 'local', 'noauth', 'debug', 'nocrtscts', 'nodetach', '+ipv6']
  13. p = subprocess.Popen(arg_list, stdout=subprocess.PIPE, bufsize=1)
  14. while not server_stop.is_set():
  15. if p.poll() is not None:
  16. raise ValueError('ENV_TEST_FAILURE: PPP terminated unexpectedly with {}'.format(p.poll()))
  17. line = p.stdout.readline()
  18. if line:
  19. print('[PPPD:]{}'.format(line.rstrip()))
  20. time.sleep(0.1)
  21. except Exception as e:
  22. print(e)
  23. raise ValueError('ENV_TEST_FAILURE: Error running PPP server')
  24. finally:
  25. p.terminate()
  26. print('PPP server stopped')
  27. @ttfw_idf.idf_custom_test(env_tag='Example_PPP', group='test-apps')
  28. def test_examples_protocol_pppos_connect(env, extra_data):
  29. """
  30. steps:
  31. 1. starts PPP server
  32. 2. get DUT as PPP client to connect to the server
  33. 3. check TCP client-server connection between client-server
  34. """
  35. dut1 = env.get_dut('pppos_connect_test', 'tools/test_apps/protocols/pppos', dut_class=ttfw_idf.ESP32DUT)
  36. # Look for test case symbolic names
  37. try:
  38. server_ip = dut1.app.get_sdkconfig()['CONFIG_TEST_APP_PPP_SERVER_IP'].replace('"','')
  39. client_ip = dut1.app.get_sdkconfig()['CONFIG_TEST_APP_PPP_CLIENT_IP'].replace('"','')
  40. port_nr = dut1.app.get_sdkconfig()['CONFIG_TEST_APP_TCP_PORT']
  41. except Exception:
  42. print('ENV_TEST_FAILURE: Some mandatory configuration not found in sdkconfig')
  43. raise
  44. print('Starting the test on {}'.format(dut1))
  45. dut1.start_app()
  46. # the PPP test env uses two ttyUSB's: one for ESP32 board, another one for ppp server
  47. # use the other port for PPP server than the DUT/ESP
  48. port = '/dev/ttyUSB0' if dut1.port == '/dev/ttyUSB1' else '/dev/ttyUSB1'
  49. # Start the PPP server
  50. server_stop = Event()
  51. t = Thread(target=run_server, args=(server_stop, port, server_ip, client_ip))
  52. t.start()
  53. try:
  54. ppp_server_timeout = time.time() + 30
  55. while 'ppp0' not in netifaces.interfaces():
  56. print("PPP server haven't yet setup its netif, list of active netifs:{}".format(netifaces.interfaces()))
  57. time.sleep(0.5)
  58. if time.time() > ppp_server_timeout:
  59. raise ValueError('ENV_TEST_FAILURE: PPP server failed to setup ppp0 interface within timeout')
  60. ip6_addr = dut1.expect(re.compile(r'Got IPv6 address (\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4})'), timeout=30)[0]
  61. print('IPv6 address of ESP: {}'.format(ip6_addr))
  62. dut1.expect(re.compile(r'Socket listening'))
  63. print('Starting the IPv6 test...')
  64. # Connect to TCP server on ESP using IPv6 address
  65. for res in socket.getaddrinfo(ip6_addr + '%ppp0', int(port_nr), socket.AF_INET6,
  66. socket.SOCK_STREAM, socket.SOL_TCP):
  67. af, socktype, proto, canonname, addr = res
  68. sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
  69. sock.connect(addr)
  70. sock.sendall(b'Espressif')
  71. sock.close()
  72. dut1.expect(re.compile(r'IPv6 test passed'))
  73. print('IPv6 test passed!')
  74. print('Starting the IPv4 test...')
  75. # Start the TCP server and wait for the ESP to connect with IPv4 address
  76. try:
  77. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  78. sock.bind(('', int(port_nr)))
  79. sock.listen(1)
  80. conn, addr = sock.accept()
  81. except socket.error as msg:
  82. print('Socket error: ' + str(msg[0]) + ': ' + msg[1])
  83. raise
  84. timeout = time.time() + 60
  85. while time.time() < timeout:
  86. data = conn.recv(128)
  87. if not data:
  88. break
  89. data = data.decode()
  90. print('Received data: ' + data)
  91. if data.startswith('Espressif'):
  92. conn.send(data.encode())
  93. break
  94. conn.close()
  95. dut1.expect(re.compile(r'IPv4 test passed'))
  96. print('IPv4 test passed!')
  97. finally:
  98. server_stop.set()
  99. t.join()
  100. if __name__ == '__main__':
  101. test_examples_protocol_pppos_connect()