mdns_example_test.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import re
  2. import os
  3. import sys
  4. import socket
  5. import time
  6. import struct
  7. import dpkt
  8. import dpkt.dns
  9. from threading import Thread
  10. # this is a test case write with tiny-test-fw.
  11. # to run test cases outside tiny-test-fw,
  12. # we need to set environment variable `TEST_FW_PATH`,
  13. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  14. try:
  15. import IDF
  16. except ImportError:
  17. test_fw_path = os.getenv("TEST_FW_PATH")
  18. if test_fw_path and test_fw_path not in sys.path:
  19. sys.path.insert(0, test_fw_path)
  20. import IDF
  21. import DUT
  22. g_run_server = True
  23. g_done = False
  24. def mdns_server(esp_host):
  25. global g_run_server
  26. global g_done
  27. UDP_IP = "0.0.0.0"
  28. UDP_PORT = 5353
  29. MCAST_GRP = '224.0.0.251'
  30. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  31. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  32. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  33. sock.bind((UDP_IP,UDP_PORT))
  34. mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
  35. sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
  36. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  37. # sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
  38. sock.settimeout(30)
  39. resp_dns = dpkt.dns.DNS(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
  40. resp_dns.op = dpkt.dns.DNS_QR | dpkt.dns.DNS_AA
  41. resp_dns.rcode = dpkt.dns.DNS_RCODE_NOERR
  42. arr = dpkt.dns.DNS.RR()
  43. arr.cls = dpkt.dns.DNS_IN
  44. arr.type = dpkt.dns.DNS_A
  45. arr.name = u'tinytester.local'
  46. arr.ip = socket.inet_aton('127.0.0.1')
  47. resp_dns. an.append(arr)
  48. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  49. while g_run_server:
  50. try:
  51. m = sock.recvfrom(1024)
  52. dns = dpkt.dns.DNS(m[0])
  53. if len(dns.qd) > 0 and dns.qd[0].type == dpkt.dns.DNS_A:
  54. if dns.qd[0].name == u'tinytester.local':
  55. print(dns.__repr__(),dns.qd[0].name)
  56. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  57. if len(dns.an) > 0 and dns.an[0].type == dpkt.dns.DNS_A:
  58. if dns.an[0].name == esp_host + u'.local':
  59. print("Received answer esp32-mdns query")
  60. g_done = True
  61. print(dns.an[0].name)
  62. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  63. dns.qd[0].name = esp_host + u'.local'
  64. sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
  65. print("Sending esp32-mdns query")
  66. time.sleep(0.5)
  67. except socket.timeout:
  68. break
  69. @IDF.idf_example_test(env_tag="Example_WIFI")
  70. def test_examples_protocol_mdns(env, extra_data):
  71. global g_run_server
  72. global g_done
  73. """
  74. steps: |
  75. 1. join AP + init mdns example
  76. 2. get the dut host name (and IP address)
  77. 3. check the mdns name is accessible
  78. 4. check DUT output if mdns advertized host is resolved
  79. """
  80. dut1 = env.get_dut("mdns-test", "examples/protocols/mdns")
  81. # check and log bin size
  82. binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
  83. bin_size = os.path.getsize(binary_file)
  84. IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size // 1024))
  85. IDF.check_performance("mdns-test_bin_size", bin_size // 1024)
  86. # 1. start mdns application
  87. dut1.start_app()
  88. # 2. get the dut host name (and IP address)
  89. specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
  90. specific_host = str(specific_host[0])
  91. try:
  92. dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  93. except DUT.ExpectTimeout:
  94. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  95. # 3. check the mdns name is accessible
  96. thread1 = Thread(target=mdns_server, args=(specific_host,))
  97. thread1.start()
  98. start = time.time()
  99. while (time.time() - start) <= 60:
  100. if g_done:
  101. print("Test passed")
  102. break
  103. g_run_server = False
  104. thread1.join()
  105. if g_done is False:
  106. raise ValueError('Test has failed: did not receive mdns answer within timeout')
  107. # 4. check DUT output if mdns advertized host is resolved
  108. dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
  109. if __name__ == '__main__':
  110. test_examples_protocol_mdns()