mdns_example_test.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import re
  2. import os
  3. import sys
  4. import socket
  5. import time
  6. import imp
  7. import struct
  8. import dpkt, dpkt.dns
  9. from threading import Thread
  10. # this is a test case write with tiny-test-fw.
  11. # to run test cases outside tiny-test-fw,
  12. # we need to set environment variable `TEST_FW_PATH`,
  13. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  14. test_fw_path = os.getenv("TEST_FW_PATH")
  15. if test_fw_path and test_fw_path not in sys.path:
  16. sys.path.insert(0, test_fw_path)
  17. import TinyFW
  18. import IDF
  19. g_run_server = True
  20. g_done = False
  21. def mdns_server(esp_host):
  22. global g_run_server
  23. global g_done
  24. UDP_IP="0.0.0.0"
  25. UDP_PORT=5353
  26. MCAST_GRP = '224.0.0.251'
  27. sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
  28. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  29. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  30. sock.bind( (UDP_IP,UDP_PORT) )
  31. mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
  32. sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
  33. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  34. # sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
  35. sock.settimeout(30)
  36. resp_dns = dpkt.dns.DNS(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
  37. resp_dns.op = dpkt.dns.DNS_QR | dpkt.dns.DNS_AA
  38. resp_dns.rcode = dpkt.dns.DNS_RCODE_NOERR
  39. arr = dpkt.dns.DNS.RR()
  40. arr.cls = dpkt.dns.DNS_IN
  41. arr.type = dpkt.dns.DNS_A
  42. arr.name = u'tinytester.local'
  43. arr.ip =socket.inet_aton('127.0.0.1')
  44. resp_dns. an.append(arr)
  45. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  46. while g_run_server:
  47. try:
  48. m=sock.recvfrom( 1024 );
  49. dns = dpkt.dns.DNS(m[0])
  50. if len(dns.qd)>0 and dns.qd[0].type == dpkt.dns.DNS_A:
  51. if dns.qd[0].name == u'tinytester.local':
  52. print (dns.__repr__(),dns.qd[0].name)
  53. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  54. if len(dns.an)>0 and dns.an[0].type == dpkt.dns.DNS_A:
  55. if dns.an[0].name == esp_host + u'.local':
  56. print("Received answer esp32-mdns query")
  57. g_done = True
  58. print (dns.an[0].name)
  59. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  60. dns.qd[0].name= esp_host + u'.local'
  61. sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
  62. print("Sending esp32-mdns query")
  63. time.sleep(0.5)
  64. except socket.timeout:
  65. break
  66. @IDF.idf_example_test(env_tag="Example_WIFI")
  67. def test_examples_protocol_mdns(env, extra_data):
  68. global g_run_server
  69. global g_done
  70. """
  71. steps: |
  72. 1. join AP + init mdns example
  73. 2. get the dut host name (and IP address)
  74. 3. check the mdns name is accessible
  75. 4. check DUT output if mdns advertized host is resolved
  76. """
  77. dut1 = env.get_dut("mdns-test", "examples/protocols/mdns")
  78. # check and log bin size
  79. binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
  80. bin_size = os.path.getsize(binary_file)
  81. IDF.log_performance("mdns-test_bin_size", "{}KB".format(bin_size//1024))
  82. IDF.check_performance("mdns-test_bin_size", bin_size//1024)
  83. # 1. start mdns application
  84. dut1.start_app()
  85. # 2. get the dut host name (and IP address)
  86. specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
  87. specific_host = str(specific_host[0])
  88. dut_ip = ""
  89. try:
  90. dut_ip = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  91. except DUT.ExpectTimeout:
  92. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  93. # 3. check the mdns name is accessible
  94. thread1 = Thread(target = mdns_server, args = (specific_host,))
  95. thread1.start()
  96. start = time.time()
  97. while (time.time() - start) <= 60:
  98. if g_done:
  99. print("Test passed")
  100. break
  101. g_run_server = False
  102. thread1.join()
  103. if g_done == False:
  104. raise ValueError('Test has failed: did not receive mdns answer within timeout')
  105. # 4. check DUT output if mdns advertized host is resolved
  106. dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
  107. if __name__ == '__main__':
  108. test_examples_protocol_mdns()