mdns_example_test.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import re
  2. import os
  3. import sys
  4. import socket
  5. import time
  6. import struct
  7. import dpkt
  8. import dpkt.dns
  9. from threading import Thread
  10. # this is a test case write with tiny-test-fw.
  11. # to run test cases outside tiny-test-fw,
  12. # we need to set environment variable `TEST_FW_PATH`,
  13. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  14. try:
  15. import IDF
  16. from IDF.IDFDUT import ESP32DUT
  17. except ImportError:
  18. test_fw_path = os.getenv("TEST_FW_PATH")
  19. if test_fw_path and test_fw_path not in sys.path:
  20. sys.path.insert(0, test_fw_path)
  21. import IDF
  22. import DUT
  23. g_run_server = True
  24. g_done = False
  25. def mdns_server(esp_host):
  26. global g_done
  27. UDP_IP = "0.0.0.0"
  28. UDP_PORT = 5353
  29. MCAST_GRP = '224.0.0.251'
  30. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  31. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  32. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  33. sock.bind((UDP_IP,UDP_PORT))
  34. mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
  35. sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
  36. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  37. sock.settimeout(30)
  38. resp_dns = dpkt.dns.DNS(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
  39. resp_dns.op = dpkt.dns.DNS_QR | dpkt.dns.DNS_AA
  40. resp_dns.rcode = dpkt.dns.DNS_RCODE_NOERR
  41. arr = dpkt.dns.DNS.RR()
  42. arr.cls = dpkt.dns.DNS_IN
  43. arr.type = dpkt.dns.DNS_A
  44. arr.name = u'tinytester.local'
  45. arr.ip = socket.inet_aton('127.0.0.1')
  46. resp_dns. an.append(arr)
  47. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  48. while g_run_server:
  49. try:
  50. m = sock.recvfrom(1024)
  51. dns = dpkt.dns.DNS(m[0])
  52. if len(dns.qd) > 0 and dns.qd[0].type == dpkt.dns.DNS_A:
  53. if dns.qd[0].name == u'tinytester.local':
  54. print(dns.__repr__(),dns.qd[0].name)
  55. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  56. if len(dns.an) > 0 and dns.an[0].type == dpkt.dns.DNS_A:
  57. if dns.an[0].name == esp_host + u'.local':
  58. print("Received answer esp32-mdns query")
  59. g_done = True
  60. print(dns.an[0].name)
  61. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  62. dns.qd[0].name = esp_host + u'.local'
  63. sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
  64. print("Sending esp32-mdns query")
  65. time.sleep(0.5)
  66. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  67. except socket.timeout:
  68. break
  69. except dpkt.UnpackError:
  70. continue
  71. @IDF.idf_example_test(env_tag="Example_WIFI")
  72. def test_examples_protocol_mdns(env, extra_data):
  73. global g_run_server
  74. """
  75. steps: |
  76. 1. join AP + init mdns example
  77. 2. get the dut host name (and IP address)
  78. 3. check the mdns name is accessible
  79. 4. check DUT output if mdns advertized host is resolved
  80. """
  81. dut1 = env.get_dut("mdns-test", "examples/protocols/mdns", dut_class=ESP32DUT)
  82. # check and log bin size
  83. binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
  84. bin_size = os.path.getsize(binary_file)
  85. IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size // 1024))
  86. IDF.check_performance("mdns-test_bin_size", bin_size // 1024)
  87. # 1. start mdns application
  88. dut1.start_app()
  89. # 2. get the dut host name (and IP address)
  90. specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
  91. specific_host = str(specific_host[0])
  92. thread1 = Thread(target=mdns_server, args=(specific_host,))
  93. thread1.start()
  94. try:
  95. dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  96. except DUT.ExpectTimeout:
  97. g_run_server = False
  98. thread1.join()
  99. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  100. # 3. check the mdns name is accessible
  101. start = time.time()
  102. while (time.time() - start) <= 60:
  103. if g_done:
  104. break
  105. time.sleep(0.5)
  106. if g_done is False:
  107. raise ValueError('Test has failed: did not receive mdns answer within timeout')
  108. # 4. check DUT output if mdns advertized host is resolved
  109. try:
  110. dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
  111. finally:
  112. g_run_server = False
  113. thread1.join()
  114. if __name__ == '__main__':
  115. test_examples_protocol_mdns()