mqtt_ssl_example_test.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from __future__ import print_function
  2. from __future__ import unicode_literals
  3. from builtins import str
  4. import re
  5. import os
  6. import sys
  7. import ssl
  8. import paho.mqtt.client as mqtt
  9. from threading import Thread, Event
  10. try:
  11. import IDF
  12. except ImportError:
  13. # this is a test case write with tiny-test-fw.
  14. # to run test cases outside tiny-test-fw,
  15. # we need to set environment variable `TEST_FW_PATH`,
  16. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  17. test_fw_path = os.getenv("TEST_FW_PATH")
  18. if test_fw_path and test_fw_path not in sys.path:
  19. sys.path.insert(0, test_fw_path)
  20. import IDF
  21. import DUT
  22. event_client_connected = Event()
  23. event_stop_client = Event()
  24. event_client_received_correct = Event()
  25. message_log = ""
  26. # The callback for when the client receives a CONNACK response from the server.
  27. def on_connect(client, userdata, flags, rc):
  28. print("Connected with result code " + str(rc))
  29. event_client_connected.set()
  30. client.subscribe("/topic/qos0")
  31. def mqtt_client_task(client):
  32. while not event_stop_client.is_set():
  33. client.loop()
  34. # The callback for when a PUBLISH message is received from the server.
  35. def on_message(client, userdata, msg):
  36. global message_log
  37. payload = msg.payload.decode()
  38. if not event_client_received_correct.is_set() and payload == "data":
  39. client.publish("/topic/qos0", "data_to_esp32")
  40. if msg.topic == "/topic/qos0" and payload == "data":
  41. event_client_received_correct.set()
  42. message_log += "Received data:" + msg.topic + " " + payload + "\n"
  43. @IDF.idf_example_test(env_tag="Example_WIFI")
  44. def test_examples_protocol_mqtt_ssl(env, extra_data):
  45. broker_url = ""
  46. broker_port = 0
  47. """
  48. steps: |
  49. 1. join AP and connects to ssl broker
  50. 2. Test connects a client to the same broker
  51. 3. Test evaluates python client received correct qos0 message
  52. 4. Test ESP32 client received correct qos0 message
  53. """
  54. dut1 = env.get_dut("mqtt_ssl", "examples/protocols/mqtt/ssl")
  55. # check and log bin size
  56. binary_file = os.path.join(dut1.app.binary_path, "mqtt_ssl.bin")
  57. bin_size = os.path.getsize(binary_file)
  58. IDF.log_performance("mqtt_ssl_bin_size", "{}KB"
  59. .format(bin_size // 1024))
  60. IDF.check_performance("mqtt_ssl_size", bin_size // 1024)
  61. # Look for host:port in sdkconfig
  62. try:
  63. value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()["CONFIG_BROKER_URI"])
  64. broker_url = value.group(1)
  65. broker_port = int(value.group(2))
  66. except Exception:
  67. print('ENV_TEST_FAILURE: Cannot find broker url in sdkconfig')
  68. raise
  69. client = None
  70. # 1. Test connects to a broker
  71. try:
  72. client = mqtt.Client()
  73. client.on_connect = on_connect
  74. client.on_message = on_message
  75. client.tls_set(None,
  76. None,
  77. None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
  78. client.tls_insecure_set(True)
  79. print("Connecting...")
  80. client.connect(broker_url, broker_port, 60)
  81. except Exception:
  82. print("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}: {}:".format(broker_url, sys.exc_info()[0]))
  83. raise
  84. # Starting a py-client in a separate thread
  85. thread1 = Thread(target=mqtt_client_task, args=(client,))
  86. thread1.start()
  87. try:
  88. print("Connecting py-client to broker {}:{}...".format(broker_url, broker_port))
  89. if not event_client_connected.wait(timeout=30):
  90. raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_url))
  91. dut1.start_app()
  92. try:
  93. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  94. print("Connected to AP with IP: {}".format(ip_address))
  95. except DUT.ExpectTimeout:
  96. print('ENV_TEST_FAILURE: Cannot connect to AP')
  97. raise
  98. print("Checking py-client received msg published from esp...")
  99. if not event_client_received_correct.wait(timeout=30):
  100. raise ValueError('Wrong data received, msg log: {}'.format(message_log))
  101. print("Checking esp-client received msg published from py-client...")
  102. dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
  103. finally:
  104. event_stop_client.set()
  105. thread1.join()
  106. if __name__ == '__main__':
  107. test_examples_protocol_mqtt_ssl()