asio_chat_client_test.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import re
  2. import os
  3. import sys
  4. import socket
  5. from threading import Thread
  6. import time
  7. try:
  8. import IDF
  9. from IDF.IDFDUT import ESP32DUT
  10. except ImportError:
  11. # this is a test case write with tiny-test-fw.
  12. # to run test cases outside tiny-test-fw,
  13. # we need to set environment variable `TEST_FW_PATH`,
  14. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  15. test_fw_path = os.getenv("TEST_FW_PATH")
  16. if test_fw_path and test_fw_path not in sys.path:
  17. sys.path.insert(0, test_fw_path)
  18. import IDF
  19. global g_client_response
  20. global g_msg_to_client
  21. g_client_response = b""
  22. g_msg_to_client = b" 3XYZ"
  23. def get_my_ip():
  24. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  25. s1.connect(("8.8.8.8", 80))
  26. my_ip = s1.getsockname()[0]
  27. s1.close()
  28. return my_ip
  29. def chat_server_sketch(my_ip):
  30. global g_client_response
  31. print("Starting the server on {}".format(my_ip))
  32. port = 2222
  33. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  34. s.settimeout(600)
  35. s.bind((my_ip, port))
  36. s.listen(1)
  37. q,addr = s.accept()
  38. print("connection accepted")
  39. q.settimeout(30)
  40. q.send(g_msg_to_client)
  41. data = q.recv(1024)
  42. # check if received initial empty message
  43. if (len(data) > 4):
  44. g_client_response = data
  45. else:
  46. g_client_response = q.recv(1024)
  47. print("received from client {}".format(g_client_response))
  48. s.close()
  49. print("server closed")
  50. @IDF.idf_example_test(env_tag="Example_WIFI")
  51. def test_examples_protocol_asio_chat_client(env, extra_data):
  52. """
  53. steps: |
  54. 1. Test to start simple tcp server
  55. 2. `dut1` joins AP
  56. 3. Test injects server IP to `dut1`via stdin
  57. 4. Test evaluates `dut1` receives a message server placed
  58. 5. Test injects a message to `dut1` to be sent as chat_client message
  59. 6. Test evaluates received test message in host server
  60. """
  61. global g_client_response
  62. global g_msg_to_client
  63. test_msg = "ABC"
  64. dut1 = env.get_dut("chat_client", "examples/protocols/asio/chat_client", dut_class=ESP32DUT)
  65. # check and log bin size
  66. binary_file = os.path.join(dut1.app.binary_path, "asio_chat_client.bin")
  67. bin_size = os.path.getsize(binary_file)
  68. IDF.log_performance("asio_chat_client_size", "{}KB".format(bin_size // 1024))
  69. IDF.check_performance("asio_chat_client_size", bin_size // 1024)
  70. # 1. start a tcp server on the host
  71. host_ip = get_my_ip()
  72. thread1 = Thread(target=chat_server_sketch, args=(host_ip,))
  73. thread1.start()
  74. # 2. start the dut test and wait till client gets IP address
  75. dut1.start_app()
  76. dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  77. # 3. send host's IP to the client i.e. the `dut1`
  78. dut1.write(host_ip)
  79. # 4. client `dut1` should receive a message
  80. dut1.expect(g_msg_to_client[4:].decode()) # Strip out the front 4 bytes of message len (see chat_message protocol)
  81. # 5. write test message from `dut1` chat_client to the server
  82. dut1.write(test_msg)
  83. while len(g_client_response) == 0:
  84. time.sleep(1)
  85. g_client_response = g_client_response.decode()
  86. print(g_client_response)
  87. # 6. evaluate host_server received this message
  88. if (g_client_response[4:7] == test_msg):
  89. print("PASS: Received correct message")
  90. pass
  91. else:
  92. print("Failure!")
  93. raise ValueError('Wrong data received from asi tcp server: {} (expected:{})'.format(g_client_response[4:7], test_msg))
  94. thread1.join()
  95. if __name__ == '__main__':
  96. test_examples_protocol_asio_chat_client()