Parcourir la source

Merge branch 'ci/publish_connect_fix' into 'master'

Fix publish connect mqtt test

See merge request espressif/esp-idf!25479
Rocha Euripedes il y a 2 ans
Parent
commit
9552ef012d

+ 8 - 0
.gitlab/ci/target-test.yml

@@ -1130,6 +1130,14 @@ pytest_test_apps_esp32_jtag:
     SETUP_TOOLS: "1"  # need gdb
     SETUP_TOOLS: "1"  # need gdb
     PYTEST_EXTRA_FLAGS: "--log-cli-level DEBUG"
     PYTEST_EXTRA_FLAGS: "--log-cli-level DEBUG"
 
 
+pytest_test_apps_esp32_ethernet:
+  extends:
+    - .pytest_test_apps_dir_template
+    - .rules:test:custom_test-esp32
+  needs:
+    - build_pytest_test_apps_esp32
+  tags: [ esp32, ethernet ]
+
 pytest_test_apps_esp32s2_generic:
 pytest_test_apps_esp32s2_generic:
   extends:
   extends:
     - .pytest_test_apps_dir_template
     - .pytest_test_apps_dir_template

+ 43 - 36
tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_app.py

@@ -1,7 +1,8 @@
-# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
+# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
 # SPDX-License-Identifier: Unlicense OR CC0-1.0
 # SPDX-License-Identifier: Unlicense OR CC0-1.0
 from __future__ import print_function, unicode_literals
 from __future__ import print_function, unicode_literals
 
 
+import difflib
 import logging
 import logging
 import os
 import os
 import random
 import random
@@ -47,6 +48,7 @@ class MqttPublisher:
     event_client_got_all = Event()
     event_client_got_all = Event()
     expected_data = ''
     expected_data = ''
     published = 0
     published = 0
+    sample = ''
 
 
     def __init__(self, dut, transport,
     def __init__(self, dut, transport,
                  qos, repeat, published, queue, publish_cfg, log_details=False):  # type: (MqttPublisher, Dut, str, int, int, int, int, dict, bool) -> None
                  qos, repeat, published, queue, publish_cfg, log_details=False):  # type: (MqttPublisher, Dut, str, int, int, int, int, dict, bool) -> None
@@ -68,11 +70,12 @@ class MqttPublisher:
         MqttPublisher.published = published
         MqttPublisher.published = published
         MqttPublisher.event_client_connected.clear()
         MqttPublisher.event_client_connected.clear()
         MqttPublisher.event_client_got_all.clear()
         MqttPublisher.event_client_got_all.clear()
-        MqttPublisher.expected_data = self.sample_string * self.repeat
+        MqttPublisher.expected_data = f'{self.sample_string * self.repeat}'
+        MqttPublisher.sample = self.sample_string
 
 
     def print_details(self, text):  # type: (str) -> None
     def print_details(self, text):  # type: (str) -> None
         if self.log_details:
         if self.log_details:
-            print(text)
+            logging.info(text)
 
 
     def mqtt_client_task(self, client, lock):  # type: (MqttPublisher, mqtt.Client, Lock) -> None
     def mqtt_client_task(self, client, lock):  # type: (MqttPublisher, mqtt.Client, Lock) -> None
         while not self.event_stop_client.is_set():
         while not self.event_stop_client.is_set():
@@ -88,12 +91,23 @@ class MqttPublisher:
     # The callback for when a PUBLISH message is received from the server (needs to be static)
     # The callback for when a PUBLISH message is received from the server (needs to be static)
     @staticmethod
     @staticmethod
     def on_message(client, userdata, msg):  # type: (mqtt.Client, int, mqtt.client.MQTTMessage) -> None
     def on_message(client, userdata, msg):  # type: (mqtt.Client, int, mqtt.client.MQTTMessage) -> None
-        payload = msg.payload.decode()
+        payload = msg.payload.decode('utf-8')
         if payload == MqttPublisher.expected_data:
         if payload == MqttPublisher.expected_data:
             userdata += 1
             userdata += 1
             client.user_data_set(userdata)
             client.user_data_set(userdata)
             if userdata == MqttPublisher.published:
             if userdata == MqttPublisher.published:
                 MqttPublisher.event_client_got_all.set()
                 MqttPublisher.event_client_got_all.set()
+        else:
+            differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, MqttPublisher.expected_data))))
+            logging.error(f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:'
+                          f'{len(MqttPublisher.expected_data)}')
+            logging.info(f'Repetitions: {payload.count(MqttPublisher.sample)}')
+            logging.info(f'Pattern: {MqttPublisher.sample}')
+            logging.info(f'First  : {payload[:DEFAULT_MSG_SIZE]}')
+            logging.info(f'Last   : {payload[-DEFAULT_MSG_SIZE:]}')
+            matcher = difflib.SequenceMatcher(a=payload, b=MqttPublisher.expected_data)
+            for match in matcher.get_matching_blocks():
+                logging.info(f'Match: {match}')
 
 
     def __enter__(self):  # type: (MqttPublisher) -> None
     def __enter__(self):  # type: (MqttPublisher) -> None
 
 
@@ -104,8 +118,8 @@ class MqttPublisher:
         broker_port = self.publish_cfg['broker_port_' + transport]
         broker_port = self.publish_cfg['broker_port_' + transport]
 
 
         # Start the test
         # Start the test
-        self.print_details("PUBLISH TEST: transport:{}, qos:{}, sequence:{}, enqueue:{}, sample msg:'{}'"
-                           .format(transport, qos, MqttPublisher.published, queue, MqttPublisher.expected_data))
+        self.print_details(f'PUBLISH TEST: transport:{transport}, qos:{qos}, sequence:{MqttPublisher.published},'
+                           f"enqueue:{queue}, sample msg:'{MqttPublisher.expected_data}'")
 
 
         try:
         try:
             if transport in ['ws', 'wss']:
             if transport in ['ws', 'wss']:
@@ -123,29 +137,29 @@ class MqttPublisher:
             self.print_details('Connecting...')
             self.print_details('Connecting...')
             self.client.connect(broker_host, broker_port, 60)
             self.client.connect(broker_host, broker_port, 60)
         except Exception:
         except Exception:
-            self.print_details('ENV_TEST_FAILURE: Unexpected error while connecting to broker {}'.format(broker_host))
+            self.print_details(f'ENV_TEST_FAILURE: Unexpected error while connecting to broker {broker_host}')
             raise
             raise
         # Starting a py-client in a separate thread
         # Starting a py-client in a separate thread
         thread1 = Thread(target=self.mqtt_client_task, args=(self.client, self.lock))
         thread1 = Thread(target=self.mqtt_client_task, args=(self.client, self.lock))
         thread1.start()
         thread1.start()
         self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port))
         self.print_details('Connecting py-client to broker {}:{}...'.format(broker_host, broker_port))
         if not MqttPublisher.event_client_connected.wait(timeout=30):
         if not MqttPublisher.event_client_connected.wait(timeout=30):
-            raise ValueError('ENV_TEST_FAILURE: Test script cannot connect to broker: {}'.format(broker_host))
+            raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}')
         with self.lock:
         with self.lock:
             self.client.subscribe(self.publish_cfg['subscribe_topic'], qos)
             self.client.subscribe(self.publish_cfg['subscribe_topic'], qos)
-        self.dut.write(' '.join(str(x) for x in (transport, self.sample_string, self.repeat, MqttPublisher.published, qos, queue)), eol='\n')
+        self.dut.write(f'{transport} {self.sample_string} {self.repeat} {MqttPublisher.published} {qos} {queue}')
         try:
         try:
             # waiting till subscribed to defined topic
             # waiting till subscribed to defined topic
-            self.dut.expect(re.compile(r'MQTT_EVENT_SUBSCRIBED'), timeout=30)
+            self.dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60)
             for _ in range(MqttPublisher.published):
             for _ in range(MqttPublisher.published):
                 with self.lock:
                 with self.lock:
                     self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos)
                     self.client.publish(self.publish_cfg['publish_topic'], self.sample_string * self.repeat, qos)
                 self.print_details('Publishing...')
                 self.print_details('Publishing...')
             self.print_details('Checking esp-client received msg published from py-client...')
             self.print_details('Checking esp-client received msg published from py-client...')
-            self.dut.expect(re.compile(r'Correct pattern received exactly x times'), timeout=60)
+            self.dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=60)
             if not MqttPublisher.event_client_got_all.wait(timeout=60):
             if not MqttPublisher.event_client_got_all.wait(timeout=60):
-                raise ValueError('Not all data received from ESP32')
-            print(' - all data received from ESP32')
+                raise ValueError('Not all data received from ESP32: {}'.format(transport))
+            logging.info(' - all data received from ESP32')
         finally:
         finally:
             self.event_stop_client.set()
             self.event_stop_client.set()
             thread1.join()
             thread1.join()
@@ -168,6 +182,8 @@ class TlsServer:
         self.client_cert = client_cert
         self.client_cert = client_cert
         self.refuse_connection = refuse_connection
         self.refuse_connection = refuse_connection
         self.use_alpn = use_alpn
         self.use_alpn = use_alpn
+        self.conn = socket.socket()
+        self.ssl_error = ''
 
 
     def __enter__(self):  # type: (TlsServer) -> TlsServer
     def __enter__(self):  # type: (TlsServer) -> TlsServer
         try:
         try:
@@ -253,12 +269,12 @@ def connection_tests(dut, cases, dut_ip):  # type: (Dut, dict, str) -> None
     server_port = 2222
     server_port = 2222
 
 
     def teardown_connection_suite() -> None:
     def teardown_connection_suite() -> None:
-        dut.write('conn teardown 0 0')
+        dut.write('conn teardown 0 0\n')
 
 
     def start_connection_case(case, desc):  # type: (str, str) -> Any
     def start_connection_case(case, desc):  # type: (str, str) -> Any
         print('Starting {}: {}'.format(case, desc))
         print('Starting {}: {}'.format(case, desc))
         case_id = cases[case]
         case_id = cases[case]
-        dut.write('conn {} {} {}'.format(ip, server_port, case_id))
+        dut.write('conn {} {} {}\n'.format(ip, server_port, case_id))
         dut.expect('Test case:{} started'.format(case_id))
         dut.expect('Test case:{} started'.format(case_id))
         return case_id
         return case_id
 
 
@@ -350,13 +366,7 @@ def test_app_protocol_mqtt_publish_connect(dut: Dut) -> None:
     esp_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30).group(1).decode()
     esp_ip = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30).group(1).decode()
     print('Got IP={}'.format(esp_ip))
     print('Got IP={}'.format(esp_ip))
 
 
-    if not os.getenv('MQTT_SKIP_CONNECT_TEST'):
-        connection_tests(dut,cases,esp_ip)
-
-    #
-    # start publish tests only if enabled in the environment (for weekend tests only)
-    if not os.getenv('MQTT_PUBLISH_TEST'):
-        return
+    connection_tests(dut,cases,esp_ip)
 
 
     # Get publish test configuration
     # Get publish test configuration
     try:
     try:
@@ -375,15 +385,9 @@ def test_app_protocol_mqtt_publish_connect(dut: Dut) -> None:
         publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_host_port_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI')
         publish_cfg['broker_host_wss'], publish_cfg['broker_port_wss'] = get_host_port_from_dut(dut, 'EXAMPLE_BROKER_WSS_URI')
 
 
     except Exception:
     except Exception:
-        print('ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig')
+        logging.error('ENV_TEST_FAILURE: Some mandatory PUBLISH test case not found in sdkconfig')
         raise
         raise
 
 
-    def start_publish_case(transport, qos, repeat, published, queue):  # type: (str, int, int, int, int) -> None
-        print('Starting Publish test: transport:{}, qos:{}, nr_of_msgs:{}, msg_size:{}, enqueue:{}'
-              .format(transport, qos, published, repeat * DEFAULT_MSG_SIZE, queue))
-        with MqttPublisher(dut, transport, qos, repeat, published, queue, publish_cfg):
-            pass
-
     # Initialize message sizes and repeat counts (if defined in the environment)
     # Initialize message sizes and repeat counts (if defined in the environment)
     messages = []
     messages = []
     for i in count(0):
     for i in count(0):
@@ -401,14 +405,17 @@ def test_app_protocol_mqtt_publish_connect(dut: Dut) -> None:
                     ]
                     ]
 
 
     # Iterate over all publish message properties
     # Iterate over all publish message properties
-    for qos in [0, 1, 2]:
-        for transport in ['tcp', 'ssl', 'ws', 'wss']:
-            for q in [0, 1]:
-                if publish_cfg['broker_host_' + transport] is None:
-                    print('Skipping transport: {}...'.format(transport))
-                    continue
+    for transport in ['tcp', 'ssl', 'ws', 'wss']:
+        if publish_cfg['broker_host_' + transport] is None:
+            print('Skipping transport: {}...'.format(transport))
+            continue
+        for enqueue in [0, 1]:
+            for qos in [0, 1, 2]:
                 for msg in messages:
                 for msg in messages:
-                    start_publish_case(transport, qos, msg['len'], msg['repeat'], q)
+                    logging.info(f'Starting Publish test: transport:{transport}, qos:{qos}, nr_of_msgs:{msg["repeat"]},'
+                                 f'msg_size:{msg["len"] * DEFAULT_MSG_SIZE}, enqueue:{enqueue}')
+                    with MqttPublisher(dut, transport, qos, msg['len'], msg['repeat'], enqueue, publish_cfg):
+                        pass
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':