mdns_example_test.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import re
  2. import os
  3. import socket
  4. import time
  5. import struct
  6. import dpkt
  7. import dpkt.dns
  8. from threading import Thread
  9. from tiny_test_fw import DUT
  10. import ttfw_idf
  11. g_run_server = True
  12. g_done = False
  13. def mdns_server(esp_host):
  14. global g_done
  15. UDP_IP = "0.0.0.0"
  16. UDP_PORT = 5353
  17. MCAST_GRP = '224.0.0.251'
  18. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  19. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  20. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  21. sock.bind((UDP_IP,UDP_PORT))
  22. mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
  23. sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
  24. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  25. sock.settimeout(30)
  26. resp_dns = dpkt.dns.DNS(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
  27. resp_dns.op = dpkt.dns.DNS_QR | dpkt.dns.DNS_AA
  28. resp_dns.rcode = dpkt.dns.DNS_RCODE_NOERR
  29. arr = dpkt.dns.DNS.RR()
  30. arr.cls = dpkt.dns.DNS_IN
  31. arr.type = dpkt.dns.DNS_A
  32. arr.name = u'tinytester.local'
  33. arr.ip = socket.inet_aton('127.0.0.1')
  34. resp_dns. an.append(arr)
  35. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  36. while g_run_server:
  37. try:
  38. m = sock.recvfrom(1024)
  39. dns = dpkt.dns.DNS(m[0])
  40. if len(dns.qd) > 0 and dns.qd[0].type == dpkt.dns.DNS_A:
  41. if dns.qd[0].name == u'tinytester.local':
  42. print(dns.__repr__(),dns.qd[0].name)
  43. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  44. if len(dns.an) > 0 and dns.an[0].type == dpkt.dns.DNS_A:
  45. if dns.an[0].name == esp_host + u'.local':
  46. print("Received answer esp32-mdns query")
  47. g_done = True
  48. print(dns.an[0].name)
  49. dns = dpkt.dns.DNS(b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01')
  50. dns.qd[0].name = esp_host + u'.local'
  51. sock.sendto(dns.pack(),(MCAST_GRP,UDP_PORT))
  52. print("Sending esp32-mdns query")
  53. time.sleep(0.5)
  54. sock.sendto(resp_dns.pack(),(MCAST_GRP,UDP_PORT))
  55. except socket.timeout:
  56. break
  57. except dpkt.UnpackError:
  58. continue
  59. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  60. def test_examples_protocol_mdns(env, extra_data):
  61. global g_run_server
  62. """
  63. steps: |
  64. 1. join AP + init mdns example
  65. 2. get the dut host name (and IP address)
  66. 3. check the mdns name is accessible
  67. 4. check DUT output if mdns advertized host is resolved
  68. """
  69. dut1 = env.get_dut("mdns-test", "examples/protocols/mdns")
  70. # check and log bin size
  71. binary_file = os.path.join(dut1.app.binary_path, "mdns-test.bin")
  72. bin_size = os.path.getsize(binary_file)
  73. ttfw_idf.log_performance("mdns-test_bin_size", "{}KB".format(bin_size // 1024))
  74. ttfw_idf.check_performance("mdns-test_bin_size", bin_size // 1024)
  75. # 1. start mdns application
  76. dut1.start_app()
  77. # 2. get the dut host name (and IP address)
  78. specific_host = dut1.expect(re.compile(r"mdns hostname set to: \[([^\]]+)\]"), timeout=30)
  79. specific_host = str(specific_host[0])
  80. thread1 = Thread(target=mdns_server, args=(specific_host,))
  81. thread1.start()
  82. try:
  83. dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  84. except DUT.ExpectTimeout:
  85. g_run_server = False
  86. thread1.join()
  87. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  88. # 3. check the mdns name is accessible
  89. start = time.time()
  90. while (time.time() - start) <= 60:
  91. if g_done:
  92. break
  93. time.sleep(0.5)
  94. if g_done is False:
  95. raise ValueError('Test has failed: did not receive mdns answer within timeout')
  96. # 4. check DUT output if mdns advertized host is resolved
  97. try:
  98. dut1.expect(re.compile(r"mdns-test: Query A: tinytester.local resolved to: 127.0.0.1"), timeout=30)
  99. finally:
  100. g_run_server = False
  101. thread1.join()
  102. if __name__ == '__main__':
  103. test_examples_protocol_mdns()