mqtt_ssl_example_test.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import re
  2. import os
  3. import sys
  4. import time
  5. import socket
  6. import imp
  7. import ssl
  8. use_mqtt_client_sketch = False
  9. try:
  10. imp.find_module('paho')
  11. import paho.mqtt.client as mqtt
  12. # Make things with supposed existing module
  13. except ImportError:
  14. use_mqtt_client_sketch = True
  15. pass
  16. global g_recv_topic
  17. global g_recv_data
  18. g_recv_data=""
  19. # This is only a workaround for running mqtt client with 'hardcoded' data using plain socket interface
  20. def mqtt_client_sketch():
  21. global g_recv_topic
  22. global g_recv_data
  23. connect_msg = bytearray([0x10, 0x0c, 00, 0x04, 0x4d, 0x51, 0x54, 0x54, 0x04, 0x02, 00, 0x3c, 00, 00])
  24. send_qos0_msg = bytearray([ 0x30, 0x1a, 0x00, 0x0b, 0x2f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x2f, 0x71, 0x6f, 0x73, 0x30, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x6f, 0x5f, 0x65, 0x73, 0x70, 0x33, 0x32])
  25. subscribe_qos0 = bytearray([ 0x82, 0x10, 0x00, 0x01, 0x00, 0x0b, 0x2f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x2f, 0x71, 0x6f, 0x73, 0x30, 0x00] )
  26. client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  27. client.settimeout(30)
  28. cli = ssl.wrap_socket(client)
  29. cli.connect(("iot.eclipse.org", 8883))
  30. cli.send(connect_msg)
  31. data = cli.recv(1024)
  32. print("Connect ack received {}".format(data))
  33. cli.send(subscribe_qos0)
  34. data = cli.recv(1024)
  35. print("Subscibe ack received {}".format(data))
  36. start = time.time()
  37. while (time.time() - start) <= 20:
  38. data = cli.recv(1024)
  39. print("Data received {}".format(data[-17:]))
  40. if data[-15:] == "/topic/qos0data":
  41. g_recv_topic = data[-15:][:11]
  42. g_recv_data = data[-4:]
  43. cli.send(send_qos0_msg)
  44. data = cli.recv(1024)
  45. print("data ack received {}".format(data))
  46. break
  47. cli.close()
  48. # The callback for when the client receives a CONNACK response from the server.
  49. def on_connect(client, userdata, flags, rc):
  50. print("Connected with result code "+str(rc))
  51. client.subscribe("/topic/qos0")
  52. # The callback for when a PUBLISH message is received from the server.
  53. def on_message(client, userdata, msg):
  54. global g_recv_topic
  55. global g_recv_data
  56. if g_recv_data == "" and msg.payload == "data":
  57. client.publish("/topic/qos0", "data_to_esp32")
  58. g_recv_topic = msg.topic
  59. g_recv_data = msg.payload
  60. print(msg.topic+" "+str(msg.payload))
  61. # this is a test case write with tiny-test-fw.
  62. # to run test cases outside tiny-test-fw,
  63. # we need to set environment variable `TEST_FW_PATH`,
  64. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  65. test_fw_path = os.getenv("TEST_FW_PATH")
  66. if test_fw_path and test_fw_path not in sys.path:
  67. sys.path.insert(0, test_fw_path)
  68. import TinyFW
  69. import IDF
  70. @IDF.idf_example_test(env_tag="Example_WIFI")
  71. def test_examples_protocol_mqtt_ssl(env, extra_data):
  72. global g_recv_topic
  73. global g_recv_data
  74. """
  75. steps: |
  76. 1. join AP and connects to ssl broker
  77. 2. Test connects a client to the same broker
  78. 3. Test evaluates python client received correct qos0 message
  79. 4. Test ESP32 client received correct qos0 message
  80. """
  81. dut1 = env.get_dut("mqtt_ssl", "examples/protocols/mqtt/ssl")
  82. # check and log bin size
  83. binary_file = os.path.join(dut1.app.binary_path, "mqtt_ssl.bin")
  84. bin_size = os.path.getsize(binary_file)
  85. IDF.log_performance("mqtt_ssl_bin_size", "{}KB".format(bin_size//1024))
  86. IDF.check_performance("mqtt_ssl_size", bin_size//1024)
  87. # 1. start test
  88. dut1.start_app()
  89. # 2. Test connects to a broker
  90. if use_mqtt_client_sketch:
  91. mqtt_client_sketch()
  92. else:
  93. client = mqtt.Client()
  94. client.on_connect = on_connect
  95. client.on_message = on_message
  96. client.tls_set(None,
  97. None,
  98. None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1, ciphers=None)
  99. client.tls_insecure_set(True)
  100. print "Connecting..."
  101. client.connect("iot.eclipse.org", 8883, 60)
  102. print "...done"
  103. print "Start Looping..."
  104. start = time.time()
  105. while (time.time() - start) <= 20:
  106. client.loop()
  107. print "...done"
  108. # 3. check the message received back from the server
  109. if g_recv_topic == "/topic/qos0" and g_recv_data == "data" :
  110. print("PASS: Received correct message")
  111. else:
  112. print("Failure!")
  113. raise ValueError('Wrong data received topic: {}, data:{}'.format(g_recv_topic, g_recv_data))
  114. # 4. check that the esp32 client received data sent by this python client
  115. dut1.expect(re.compile(r"DATA=data_to_esp32"), timeout=30)
  116. if __name__ == '__main__':
  117. test_examples_protocol_mqtt_ssl()