mdns_example_test.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import re
  2. import os
  3. import sys
  4. import socket
  5. import time
  6. import struct
  7. import dpkt
  8. import dpkt.dns
  9. from threading import Thread
  10. # this is a test case write with tiny-test-fw.
  11. # to run test cases outside tiny-test-fw,
  12. # we need to set environment variable `TEST_FW_PATH`,
  13. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  14. try:
  15. import IDF
  16. except ImportError:
  17. test_fw_path = os.getenv("TEST_FW_PATH")
  18. if test_fw_path and test_fw_path not in sys.path:
  19. sys.path.insert(0, test_fw_path)
  20. import IDF
  21. import DUT
  22. g_run_server = True
  23. g_done = False
  24. def mdns_server(esp_host):
  25. global g_done
  26. UDP_IP = "0.0.0.0"
  27. UDP_PORT = 5353
  28. MCAST_GRP = '224.0.0.251'
  29. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  30. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  31. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  32. sock.bind((UDP_IP,UDP_PORT))
  33. mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
  34. sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
  35. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  36. sock.settimeout(30)
  37. resp_dns = dpkt.dns.DNS(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
  38. resp_dns.op = dpkt.dns.DNS_QR | dpkt.dns.DNS_AA
  39. resp_dns.rcode = dpkt.dns.DNS_RCODE_NOERR
  40. arr = dpkt.dns.DNS.RR()
  41. arr.cls = dpkt.dns.DNS_IN
  42. arr.type = dpkt.dns.DNS_A
  43. arr.name = u'tinytester.local'
  44. arr.ip = socket.inet_aton('127.0.0.1')
  45. resp_dns. an.append(arr)
  46. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  47. while g_run_server:
  48. try:
  49. m = sock.recvfrom(1024)
  50. dns = dpkt.dns.DNS(m[0])
  51. if len(dns.qd) > 0 and dns.qd[0].type == dpkt.dns.DNS_A:
  52. if dns.qd[0].name == u'tinytester.local':
  53. print(dns.__repr__(),dns.qd[0].name)
  54. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  55. if len(dns.an) > 0 and dns.an[0].type == dpkt.dns.DNS_A:
  56. if dns.an[0].name == esp_host + u'.local':
  57. print("Received answer esp32-mdns query")
  58. g_done = True
  59. print(dns.an[0].name)
  60. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  61. dns.qd[0].name = esp_host + u'.local'
  62. sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
  63. print("Sending esp32-mdns query")
  64. time.sleep(0.5)
  65. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  66. except socket.timeout:
  67. break
  68. except dpkt.UnpackError:
  69. continue
  70. @IDF.idf_example_test(env_tag="Example_WIFI")
  71. def test_examples_protocol_mdns(env, extra_data):
  72. global g_run_server
  73. """
  74. steps: |
  75. 1. join AP + init mdns example
  76. 2. get the dut host name (and IP address)
  77. 3. check the mdns name is accessible
  78. 4. check DUT output if mdns advertized host is resolved
  79. """
  80. dut1 = env.get_dut("mdns-test", "examples/protocols/mdns")
  81. # check and log bin size
  82. binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
  83. bin_size = os.path.getsize(binary_file)
  84. IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size // 1024))
  85. IDF.check_performance("mdns-test_bin_size", bin_size // 1024)
  86. # 1. start mdns application
  87. dut1.start_app()
  88. # 2. get the dut host name (and IP address)
  89. specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
  90. specific_host = str(specific_host[0])
  91. thread1 = Thread(target=mdns_server, args=(specific_host,))
  92. thread1.start()
  93. try:
  94. dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  95. except DUT.ExpectTimeout:
  96. g_run_server = False
  97. thread1.join()
  98. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  99. # 3. check the mdns name is accessible
  100. start = time.time()
  101. while (time.time() - start) <= 60:
  102. if g_done:
  103. break
  104. time.sleep(0.5)
  105. if g_done is False:
  106. raise ValueError('Test has failed: did not receive mdns answer within timeout')
  107. # 4. check DUT output if mdns advertized host is resolved
  108. try:
  109. dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
  110. finally:
  111. g_run_server = False
  112. thread1.join()
  113. if __name__ == '__main__':
  114. test_examples_protocol_mdns()