app_test.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from __future__ import print_function
  2. from __future__ import unicode_literals
  3. import re
  4. import socket
  5. import subprocess
  6. import ttfw_idf
  7. import time
  8. import netifaces
  9. from threading import Thread, Event
  10. def run_server(server_stop, port, server_ip, client_ip):
  11. print("Starting PPP server on port: {}".format(port))
  12. try:
  13. arg_list = ['pppd', port, '115200', '{}:{}'.format(server_ip, client_ip), 'modem', 'local', 'noauth', 'debug', 'nocrtscts', 'nodetach', '+ipv6']
  14. p = subprocess.Popen(arg_list, stdout=subprocess.PIPE, bufsize=1)
  15. while not server_stop.is_set():
  16. if p.poll() is not None:
  17. raise ValueError('ENV_TEST_FAILURE: PPP terminated unexpectedly with {}'.format(p.poll()))
  18. line = p.stdout.readline()
  19. if line:
  20. print("[PPPD:]{}".format(line.rstrip()))
  21. time.sleep(0.1)
  22. except Exception as e:
  23. print(e)
  24. raise ValueError('ENV_TEST_FAILURE: Error running PPP server')
  25. finally:
  26. p.terminate()
  27. print("PPP server stopped")
  28. @ttfw_idf.idf_custom_test(env_tag="Example_PPP", group="test-apps")
  29. def test_examples_protocol_pppos_connect(env, extra_data):
  30. """
  31. steps:
  32. 1. starts PPP server
  33. 2. get DUT as PPP client to connect to the server
  34. 3. check TCP client-server connection between client-server
  35. """
  36. dut1 = env.get_dut("pppos_connect_test", "tools/test_apps/protocols/pppos", dut_class=ttfw_idf.ESP32DUT)
  37. # Look for test case symbolic names
  38. try:
  39. server_ip = dut1.app.get_sdkconfig()["CONFIG_TEST_APP_PPP_SERVER_IP"].replace('"','')
  40. client_ip = dut1.app.get_sdkconfig()["CONFIG_TEST_APP_PPP_CLIENT_IP"].replace('"','')
  41. port_nr = dut1.app.get_sdkconfig()["CONFIG_TEST_APP_TCP_PORT"]
  42. except Exception:
  43. print('ENV_TEST_FAILURE: Some mandatory configuration not found in sdkconfig')
  44. raise
  45. print("Starting the test on {}".format(dut1))
  46. dut1.start_app()
  47. # the PPP test env uses two ttyUSB's: one for ESP32 board, another one for ppp server
  48. # use the other port for PPP server than the DUT/ESP
  49. port = '/dev/ttyUSB0' if dut1.port == '/dev/ttyUSB1' else '/dev/ttyUSB1'
  50. # Start the PPP server
  51. server_stop = Event()
  52. t = Thread(target=run_server, args=(server_stop, port, server_ip, client_ip))
  53. t.start()
  54. try:
  55. ppp_server_timeout = time.time() + 30
  56. while "ppp0" not in netifaces.interfaces():
  57. print("PPP server haven't yet setup its netif, list of active netifs:{}".format(netifaces.interfaces()))
  58. time.sleep(0.5)
  59. if time.time() > ppp_server_timeout:
  60. raise ValueError("ENV_TEST_FAILURE: PPP server failed to setup ppp0 interface within timeout")
  61. ip6_addr = dut1.expect(re.compile(r"Got IPv6 address (\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4}\:\w{4})"), timeout=30)[0]
  62. print("IPv6 address of ESP: {}".format(ip6_addr))
  63. dut1.expect(re.compile(r"Socket listening"))
  64. print("Starting the IPv6 test...")
  65. # Connect to TCP server on ESP using IPv6 address
  66. for res in socket.getaddrinfo(ip6_addr + "%ppp0", int(port_nr), socket.AF_INET6,
  67. socket.SOCK_STREAM, socket.SOL_TCP):
  68. af, socktype, proto, canonname, addr = res
  69. sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
  70. sock.connect(addr)
  71. sock.sendall(b"Espressif")
  72. sock.close()
  73. dut1.expect(re.compile(r"IPv6 test passed"))
  74. print("IPv6 test passed!")
  75. print("Starting the IPv4 test...")
  76. # Start the TCP server and wait for the ESP to connect with IPv4 address
  77. try:
  78. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  79. sock.bind(('', int(port_nr)))
  80. sock.listen(1)
  81. conn, addr = sock.accept()
  82. except socket.error as msg:
  83. print('Socket error: ' + str(msg[0]) + ': ' + msg[1])
  84. raise
  85. timeout = time.time() + 60
  86. while time.time() < timeout:
  87. data = conn.recv(128)
  88. if not data:
  89. break
  90. data = data.decode()
  91. print('Received data: ' + data)
  92. if data.startswith('Espressif'):
  93. conn.send(data.encode())
  94. break
  95. conn.close()
  96. dut1.expect(re.compile(r"IPv4 test passed"))
  97. print("IPv4 test passed!")
  98. finally:
  99. server_stop.set()
  100. t.join()
  101. if __name__ == '__main__':
  102. test_examples_protocol_pppos_connect()