Преглед изворни кода

ci: Add MQTT publish test to standard test apps

David Cermak пре 5 година
родитељ
комит
5472deec6e

+ 2 - 2
components/mqtt/weekend_test/mqtt_publish_test.py

@@ -56,7 +56,7 @@ def on_message(client, userdata, msg):
     message_log += "Received data:" + msg.topic + " " + payload + "\n"
 
 
-def test_single_config(dut, transport, qos, repeat, published, queue = 0):
+def test_single_config(dut, transport, qos, repeat, published, queue=0):
     global expected_count
     global expected_data
     global message_log
@@ -89,7 +89,7 @@ def test_single_config(dut, transport, qos, repeat, published, queue = 0):
     if not event_client_connected.wait(timeout=30):
         raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_host[transport]))
     client.subscribe(subscribe_topic, qos)
-    dut.write("{} {} {} {} {} {}".format(transport, sample_string, repeat, published, qos, queue), eol="\n")
+    dut.write(' '.join(str(x) for x in (transport, sample_string, repeat, published, qos, queue)), eol="\n")
     try:
         # waiting till subscribed to defined topic
         dut.expect(re.compile(r"MQTT_EVENT_SUBSCRIBED"), timeout=30)

+ 154 - 9
tools/test_apps/protocols/mqtt/publish_connect_test/app_test.py

@@ -8,6 +8,11 @@ import subprocess
 from threading import Thread, Event
 import ttfw_idf
 import ssl
+import paho.mqtt.client as mqtt
+import string
+import random
+
+DEFAULT_MSG_SIZE = 16
 
 
 def _path(f):
@@ -37,6 +42,109 @@ def get_my_ip():
     return IP
 
 
+# Publisher class creating a python client to send/receive published data from esp-mqtt client
+class MqttPublisher:
+
+    def __init__(self, dut, transport, qos, repeat, published, queue, publish_cfg, log_details=False):
+        # instance variables used as parameters of the publish test
+        self.event_stop_client = Event()
+        self.sample_string = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE))
+        self.client = None
+        self.dut = dut
+        self.log_details = log_details
+        self.repeat = repeat
+        self.publish_cfg = publish_cfg
+        self.publish_cfg["qos"] = qos
+        self.publish_cfg["queue"] = queue
+        self.publish_cfg["transport"] = transport
+        # static variables used to pass options to and from static callbacks of paho-mqtt client
+        MqttPublisher.event_client_connected = Event()
+        MqttPublisher.event_client_got_all = Event()
+        MqttPublisher.published = published
+        MqttPublisher.event_client_connected.clear()
+        MqttPublisher.event_client_got_all.clear()
+        MqttPublisher.expected_data = self.sample_string * self.repeat
+
+    def print_details(self, text):
+        if self.log_details:
+            print(text)
+
+    def mqtt_client_task(self, client):
+        while not self.event_stop_client.is_set():
+            client.loop()
+
+    # The callback for when the client receives a CONNACK response from the server (needs to be static)
+    @staticmethod
+    def on_connect(_client, _userdata, _flags, _rc):
+        MqttPublisher.event_client_connected.set()
+
+    # The callback for when a PUBLISH message is received from the server (needs to be static)
+    @staticmethod
+    def on_message(client, userdata, msg):
+        payload = msg.payload.decode()
+        if payload == MqttPublisher.expected_data:
+            userdata += 1
+            client.user_data_set(userdata)
+            if userdata == MqttPublisher.published:
+                MqttPublisher.event_client_got_all.set()
+
+    def __enter__(self):
+
+        qos = self.publish_cfg["qos"]
+        queue = self.publish_cfg["queue"]
+        transport = self.publish_cfg["transport"]
+        broker_host = self.publish_cfg["broker_host_" + transport]
+        broker_port = self.publish_cfg["broker_port_" + transport]
+
+        # Start the test
+        self.print_details("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, enqueue:{}, sample msg:'{}'"
+                           .format(transport, qos, MqttPublisher.published, queue, MqttPublisher.expected_data))
+
+        try:
+            if transport in ["ws", "wss"]:
+                self.client = mqtt.Client(transport="websockets")
+            else:
+                self.client = mqtt.Client()
+            self.client.on_connect = MqttPublisher.on_connect
+            self.client.on_message = MqttPublisher.on_message
+            self.client.user_data_set(0)
+
+            if transport in ["ssl", "wss"]:
+                self.client.tls_set(None, None, None, cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
+                self.client.tls_insecure_set(True)
+            self.print_details("Connecting...")
+            self.client.connect(broker_host, broker_port, 60)
+        except Exception:
+            self.print_details("ENV_TEST_FAILURE: Unexpected error while connecting to broker {}".format(broker_host))
+            raise
+        # Starting a py-client in a separate thread
+        thread1 = Thread(target=self.mqtt_client_task, args=(self.client,))
+        thread1.start()
+        self.print_details("Connecting py-client to broker {}:{}...".format(broker_host, broker_port))
+        if not MqttPublisher.event_client_connected.wait(timeout=30):
+            raise ValueError("ENV_TEST_FAILURE: Test script cannot connect to broker: {}".format(broker_host))
+        self.client.subscribe(self.publish_cfg["subscribe_topic"], qos)
+        self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol="\n")
+        try:
+            # waiting till subscribed to defined topic
+            self.dut.expect(re.compile(r"MQTT_EVENT_SUBSCRIBED"), timeout=30)
+            for _ in range(MqttPublisher.published):
+                self.client.publish(self.publish_cfg["publish_topic"], self.sample_string * self.repeat, qos)
+                self.print_details("Publishing...")
+            self.print_details("Checking esp-client received msg published from py-client...")
+            self.dut.expect(re.compile(r"Correct pattern received exactly x times"), timeout=60)
+            if not MqttPublisher.event_client_got_all.wait(timeout=60):
+                raise ValueError("Not all data received from ESP32")
+            print(" - all data received from ESP32")
+        finally:
+            self.event_stop_client.set()
+            thread1.join()
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.client.disconnect()
+        self.event_stop_client.clear()
+
+
 # Simple server for mqtt over TLS connection
 class TlsServer:
 
@@ -143,9 +251,18 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
     binary_file = os.path.join(dut1.app.binary_path, "mqtt_publish_connect_test.bin")
     bin_size = os.path.getsize(binary_file)
     ttfw_idf.log_performance("mqtt_publish_connect_test_bin_size", "{}KB".format(bin_size // 1024))
-    # Look for test case symbolic names
+
+    # Look for test case symbolic names and publish configs
     cases = {}
+    publish_cfg = {}
     try:
+        def get_host_port_from_dut(dut1, config_option):
+            value = re.search(r'\:\/\/([^:]+)\:([0-9]+)', dut1.app.get_sdkconfig()[config_option])
+            if value is None:
+                return None, None
+            return value.group(1), int(value.group(2))
+
+        # Get connection test cases configuration: symbolic names for test cases
         for i in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT",
                   "CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT",
                   "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH",
@@ -155,6 +272,14 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
                   "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT",
                   "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN"]:
             cases[i] = dut1.app.get_sdkconfig()[i]
+        # Get publish test configuration
+        publish_cfg["publish_topic"] = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_SUBSCIBE_TOPIC"].replace('"','')
+        publish_cfg["subscribe_topic"] = dut1.app.get_sdkconfig()["CONFIG_EXAMPLE_PUBLISH_TOPIC"].replace('"','')
+        publish_cfg["broker_host_ssl"], publish_cfg["broker_port_ssl"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_SSL_URI")
+        publish_cfg["broker_host_tcp"], publish_cfg["broker_port_tcp"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_TCP_URI")
+        publish_cfg["broker_host_ws"], publish_cfg["broker_port_ws"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WS_URI")
+        publish_cfg["broker_host_wss"], publish_cfg["broker_port_wss"] = get_host_port_from_dut(dut1, "CONFIG_EXAMPLE_BROKER_WSS_URI")
+
     except Exception:
         print('ENV_TEST_FAILURE: Some mandatory test case not found in sdkconfig')
         raise
@@ -162,13 +287,14 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
     dut1.start_app()
     esp_ip = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
     print("Got IP={}".format(esp_ip[0]))
+
     #
     # start connection test
     ip = get_my_ip()
     set_server_cert_cn(ip)
     server_port = 2222
 
-    def start_case(case, desc):
+    def start_connection_case(case, desc):
         print("Starting {}: {}".format(case, desc))
         case_id = cases[case]
         dut1.write("conn {} {} {}".format(ip, server_port, case_id))
@@ -178,14 +304,14 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
     for case in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_SERVER_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_SERVER_DER_CERT"]:
         # All these cases connect to the server with no server verification or with server only verification
         with TlsServer(server_port):
-            test_nr = start_case(case, "default server - expect to connect normally")
+            test_nr = start_connection_case(case, "default server - expect to connect normally")
             dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
         with TlsServer(server_port, refuse_connection=True):
-            test_nr = start_case(case, "ssl shall connect, but mqtt sends connect refusal")
+            test_nr = start_connection_case(case, "ssl shall connect, but mqtt sends connect refusal")
             dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
             dut1.expect("MQTT ERROR: 0x5")  # expecting 0x5 ... connection not authorized error
         with TlsServer(server_port, client_cert=True) as s:
-            test_nr = start_case(case, "server with client verification - handshake error since client presents no client certificate")
+            test_nr = start_connection_case(case, "server with client verification - handshake error since client presents no client certificate")
             dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
             dut1.expect("ESP-TLS ERROR: 0x8010")  # expect ... handshake error (PEER_DID_NOT_RETURN_A_CERTIFICATE)
             if "PEER_DID_NOT_RETURN_A_CERTIFICATE" not in s.get_last_ssl_error():
@@ -194,12 +320,12 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
     for case in ["CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH", "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_KEY_PWD"]:
         # These cases connect to server with both server and client verification (client key might be password protected)
         with TlsServer(server_port, client_cert=True):
-            test_nr = start_case(case, "server with client verification - expect to connect normally")
+            test_nr = start_connection_case(case, "server with client verification - expect to connect normally")
             dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
 
     case = "CONFIG_EXAMPLE_CONNECT_CASE_INVALID_SERVER_CERT"
     with TlsServer(server_port) as s:
-        test_nr = start_case(case, "invalid server certificate on default server - expect ssl handshake error")
+        test_nr = start_connection_case(case, "invalid server certificate on default server - expect ssl handshake error")
         dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
         dut1.expect("ESP-TLS ERROR: 0x8010")  # expect ... handshake error (TLSV1_ALERT_UNKNOWN_CA)
         if "alert unknown ca" not in s.get_last_ssl_error():
@@ -207,7 +333,7 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
 
     case = "CONFIG_EXAMPLE_CONNECT_CASE_MUTUAL_AUTH_BAD_CRT"
     with TlsServer(server_port, client_cert=True) as s:
-        test_nr = start_case(case, "Invalid client certificate on server with client verification - expect ssl handshake error")
+        test_nr = start_connection_case(case, "Invalid client certificate on server with client verification - expect ssl handshake error")
         dut1.expect("MQTT_EVENT_ERROR: Test={}".format(test_nr), timeout=30)
         dut1.expect("ESP-TLS ERROR: 0x8010")  # expect ... handshake error (CERTIFICATE_VERIFY_FAILED)
         if "CERTIFICATE_VERIFY_FAILED" not in s.get_last_ssl_error():
@@ -215,7 +341,7 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
 
     for case in ["CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT", "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT_ALPN"]:
         with TlsServer(server_port, use_alpn=True) as s:
-            test_nr = start_case(case, "server with alpn - expect connect, check resolved protocol")
+            test_nr = start_connection_case(case, "server with alpn - expect connect, check resolved protocol")
             dut1.expect("MQTT_EVENT_CONNECTED: Test={}".format(test_nr), timeout=30)
             if case == "CONFIG_EXAMPLE_CONNECT_CASE_NO_CERT" and s.get_negotiated_protocol() is None:
                 print(" - client with alpn off, no negotiated protocol: OK")
@@ -224,6 +350,25 @@ def test_app_protocol_mqtt_publish_connect(env, extra_data):
             else:
                 raise Exception("Unexpected negotiated protocol {}".format(s.get_negotiated_protocol()))
 
+    #
+    # start publish tests
+    def start_publish_case(transport, qos, repeat, published, queue):
+        print("Starting Publish test: transport:{}, qos:{}, nr_of_msgs:{}, msg_size:{}, enqueue:{}"
+              .format(transport, qos, published, repeat * DEFAULT_MSG_SIZE, queue))
+        with MqttPublisher(dut1, transport, qos, repeat, published, queue, publish_cfg):
+            pass
+
+    for qos in [0, 1, 2]:
+        for transport in ["tcp", "ssl", "ws", "wss"]:
+            for q in [0, 1]:
+                if publish_cfg["broker_host_" + transport] is None:
+                    print('Skipping transport: {}...'.format(transport))
+                    continue
+                start_publish_case(transport, qos, 0, 5, q)
+                start_publish_case(transport, qos, 2, 5, q)
+                start_publish_case(transport, qos, 50, 1, q)
+                start_publish_case(transport, qos, 10, 20, q)
+
 
 if __name__ == '__main__':
     test_app_protocol_mqtt_publish_connect()