Kaynağa Gözat

Fix OTA example test to fix CI failures

Using `try finally` block to terminate thread
Harshit Malpani 3 yıl önce
ebeveyn
işleme
f9c25c65e1

+ 341 - 334
examples/system/ota/advanced_https_ota/pytest_advanced_ota.py

@@ -28,15 +28,6 @@ def get_my_ip() -> str:
     return my_ip
     return my_ip
 
 
 
 
-def get_server_status(host_ip: str, port: int) -> bool:
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    server_status = sock.connect_ex((host_ip, port))
-    sock.close()
-    if server_status == 0:
-        return True
-    return False
-
-
 def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
 def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
     """
     """
     Returns a request handler class that handles broken pipe exception
     Returns a request handler class that handles broken pipe exception
@@ -123,29 +114,30 @@ def test_examples_protocol_advanced_https_ota_example(dut: Dut) -> None:
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    # Number of iterations to validate OTA
-    iterations = 3
-    server_port = 8001
-    bin_name = 'advanced_https_ota.bin'
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        # Number of iterations to validate OTA
+        iterations = 3
+        server_port = 8001
+        bin_name = 'advanced_https_ota.bin'
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    for i in range(iterations):
-        dut.expect('Loaded app from partition at offset', timeout=30)
-        try:
-            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-            print('Connected to AP with IP: {}'.format(ip_address))
-        except pexpect.exceptions.TIMEOUT:
-            thread1.terminate()
-            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-        dut.expect('Starting Advanced OTA example', timeout=30)
-
-        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
-        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
-    thread1.terminate()
+        for i in range(iterations):
+            dut.expect('Loaded app from partition at offset', timeout=30)
+            try:
+                ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+                print('Connected to AP with IP: {}'.format(ip_address))
+            except pexpect.exceptions.TIMEOUT:
+                thread1.terminate()
+                raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+            dut.expect('Starting Advanced OTA example', timeout=30)
+
+            print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
+            dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -163,43 +155,44 @@ def test_examples_protocol_advanced_https_ota_example_truncated_bin(dut: Dut) ->
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of code if bin is truncated
       4. Check working of code if bin is truncated
     """
     """
-    server_port = 8001
-    # Original binary file generated after compilation
-    bin_name = 'advanced_https_ota.bin'
-    # Truncated binary file to be generated from original binary file
-    truncated_bin_name = 'truncated.bin'
-    # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
-    # truncated_bin_size is set to 64000 to reduce consumed by the test case
-    truncated_bin_size = 64000
-    binary_file = os.path.join(dut.app.binary_path, bin_name)
-    with open(binary_file, 'rb+') as f:
-        with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
-            fo.write(f.read(truncated_bin_size))
-
-    binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        # Original binary file generated after compilation
+        bin_name = 'advanced_https_ota.bin'
+        # Truncated binary file to be generated from original binary file
+        truncated_bin_name = 'truncated.bin'
+        # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
+        # truncated_bin_size is set to 64000 to reduce consumed by the test case
+        truncated_bin_size = 64000
+        binary_file = os.path.join(dut.app.binary_path, bin_name)
+        with open(binary_file, 'rb+') as f:
+            with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
+                fo.write(f.read(truncated_bin_size))
+
+        binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting Advanced OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
-    dut.expect('Image validation failed, image is corrupted', timeout=30)
-    try:
-        os.remove(binary_file)
-    except OSError:
-        pass
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
+        dut.expect('Image validation failed, image is corrupted', timeout=30)
+        try:
+            os.remove(binary_file)
+        except OSError:
+            pass
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -217,43 +210,44 @@ def test_examples_protocol_advanced_https_ota_example_truncated_header(dut: Dut)
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of code if headers are not sent completely
       4. Check working of code if headers are not sent completely
     """
     """
-    server_port = 8001
-    # Original binary file generated after compilation
-    bin_name = 'advanced_https_ota.bin'
-    # Truncated binary file to be generated from original binary file
-    truncated_bin_name = 'truncated_header.bin'
-    # Size of truncated file to be generated. This value should be less than 288 bytes (Image header size)
-    truncated_bin_size = 180
-    # check and log bin size
-    binary_file = os.path.join(dut.app.binary_path, bin_name)
-    with open(binary_file, 'rb+') as f:
-        with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
-            fo.write(f.read(truncated_bin_size))
-
-    binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        # Original binary file generated after compilation
+        bin_name = 'advanced_https_ota.bin'
+        # Truncated binary file to be generated from original binary file
+        truncated_bin_name = 'truncated_header.bin'
+        # Size of truncated file to be generated. This value should be less than 288 bytes (Image header size)
+        truncated_bin_size = 180
+        # check and log bin size
+        binary_file = os.path.join(dut.app.binary_path, bin_name)
+        with open(binary_file, 'rb+') as f:
+            with open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+') as fo:
+                fo.write(f.read(truncated_bin_size))
+
+        binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting Advanced OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
-    dut.expect('advanced_https_ota_example: esp_https_ota_read_img_desc failed', timeout=30)
-    try:
-        os.remove(binary_file)
-    except OSError:
-        pass
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
+        dut.expect('advanced_https_ota_example: esp_https_ota_read_img_desc failed', timeout=30)
+        try:
+            os.remove(binary_file)
+        except OSError:
+            pass
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -271,43 +265,44 @@ def test_examples_protocol_advanced_https_ota_example_random(dut: Dut) -> None:
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of code for random binary file
       4. Check working of code for random binary file
     """
     """
-    server_port = 8001
-    # Random binary file to be generated
-    random_bin_name = 'random.bin'
-    # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
-    random_bin_size = 32000
-    # check and log bin size
-    binary_file = os.path.join(dut.app.binary_path, random_bin_name)
-    with open(binary_file, 'wb+') as fo:
-        # First byte of binary file is always set to zero. If first byte is generated randomly,
-        # in some cases it may generate 0xE9 which will result in failure of testcase.
-        fo.write(struct.pack('B', 0))
-        for i in range(random_bin_size - 1):
-            fo.write(struct.pack('B', random.randrange(0,255,1)))
+    try:
+        server_port = 8001
+        # Random binary file to be generated
+        random_bin_name = 'random.bin'
+        # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
+        random_bin_size = 32000
+        # check and log bin size
+        binary_file = os.path.join(dut.app.binary_path, random_bin_name)
+        with open(binary_file, 'wb+') as fo:
+            # First byte of binary file is always set to zero. If first byte is generated randomly,
+            # in some cases it may generate 0xE9 which will result in failure of testcase.
+            fo.write(struct.pack('B', 0))
+            for i in range(random_bin_size - 1):
+                fo.write(struct.pack('B', random.randrange(0,255,1)))
 
 
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting Advanced OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
-    dut.expect(r'esp_https_ota: Incorrect app descriptor magic', timeout=10)
-    try:
-        os.remove(binary_file)
-    except OSError:
-        pass
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
+        dut.expect(r'esp_https_ota: Incorrect app descriptor magic', timeout=10)
+        try:
+            os.remove(binary_file)
+        except OSError:
+            pass
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -325,45 +320,46 @@ def test_examples_protocol_advanced_https_ota_example_invalid_chip_id(dut: Dut)
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of code for random binary file
       4. Check working of code for random binary file
     """
     """
-    server_port = 8001
-    bin_name = 'advanced_https_ota.bin'
-    # Random binary file to be generated
-    random_bin_name = 'random.bin'
-    random_binary_file = os.path.join(dut.app.binary_path, random_bin_name)
-    # Size of random binary file. 2000 is choosen, to reduce the time required to run the test-case
-    random_bin_size = 2000
-
-    binary_file = os.path.join(dut.app.binary_path, bin_name)
-    with open(binary_file, 'rb+') as f:
-        data = list(f.read(random_bin_size))
-    # Changing Chip id
-    data[13] = 0xfe
-    with open(random_binary_file, 'wb+') as fo:
-        fo.write(bytearray(data))
-
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        bin_name = 'advanced_https_ota.bin'
+        # Random binary file to be generated
+        random_bin_name = 'random.bin'
+        random_binary_file = os.path.join(dut.app.binary_path, random_bin_name)
+        # Size of random binary file. 2000 is choosen, to reduce the time required to run the test-case
+        random_bin_size = 2000
+
+        binary_file = os.path.join(dut.app.binary_path, bin_name)
+        with open(binary_file, 'rb+') as f:
+            data = list(f.read(random_bin_size))
+        # Changing Chip id
+        data[13] = 0xfe
+        with open(random_binary_file, 'wb+') as fo:
+            fo.write(bytearray(data))
+
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting Advanced OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
-    dut.expect(r'esp_https_ota: Mismatch chip id, expected 0, found \d', timeout=10)
-    try:
-        os.remove(random_binary_file)
-    except OSError:
-        pass
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
+        dut.expect(r'esp_https_ota: Mismatch chip id, expected 0, found \d', timeout=10)
+        try:
+            os.remove(random_binary_file)
+        except OSError:
+            pass
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -385,19 +381,22 @@ def test_examples_protocol_advanced_https_ota_example_chunked(dut: Dut) -> None:
     # start test
     # start test
     host_ip = get_my_ip()
     host_ip = get_my_ip()
     chunked_server = start_chunked_server(dut.app.binary_path, 8070)
     chunked_server = start_chunked_server(dut.app.binary_path, 8070)
-    dut.expect('Loaded app from partition at offset', timeout=30)
     try:
     try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
-    dut.write('https://' + host_ip + ':8070/' + bin_name)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    chunked_server.kill()
+        dut.expect('Starting Advanced OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
+        dut.write('https://' + host_ip + ':8070/' + bin_name)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect('Starting Advanced OTA example', timeout=30)
+    finally:
+        chunked_server.kill()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -415,42 +414,43 @@ def test_examples_protocol_advanced_https_ota_example_redirect_url(dut: Dut) ->
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    server_port = 8001
-    # Port to which the request should be redirected
-    redirection_server_port = 8081
-    redirection_server_port1 = 8082
-    # File to be downloaded. This file is generated after compilation
-    bin_name = 'advanced_https_ota.bin'
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        # Port to which the request should be redirected
+        redirection_server_port = 8081
+        redirection_server_port1 = 8082
+        # File to be downloaded. This file is generated after compilation
+        bin_name = 'advanced_https_ota.bin'
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    thread2 = multiprocessing.Process(target=start_redirect_server, args=(dut.app.binary_path, host_ip, redirection_server_port, redirection_server_port1))
-    thread2.daemon = True
-    thread2.start()
-    thread3 = multiprocessing.Process(target=start_redirect_server, args=(dut.app.binary_path, host_ip, redirection_server_port1, server_port))
-    thread3.daemon = True
-    thread3.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        thread2 = multiprocessing.Process(target=start_redirect_server, args=(dut.app.binary_path, host_ip, redirection_server_port, redirection_server_port1))
+        thread2.daemon = True
+        thread2.start()
+        thread3 = multiprocessing.Process(target=start_redirect_server, args=(dut.app.binary_path, host_ip, redirection_server_port1, server_port))
+        thread3.daemon = True
+        thread3.start()
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            thread2.terminate()
+            thread3.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting Advanced OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name))
+        dut.write('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect('Starting Advanced OTA example', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
         thread2.terminate()
         thread2.terminate()
         thread3.terminate()
         thread3.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name))
-    dut.write('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    thread1.terminate()
-    thread2.terminate()
-    thread3.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -470,57 +470,58 @@ def test_examples_protocol_advanced_https_ota_example_anti_rollback(dut: Dut) ->
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of anti_rollback feature
       4. Check working of anti_rollback feature
     """
     """
-    dut.serial.erase_flash()
-    dut.serial.flash()
-    server_port = 8001
-    # Original binary file generated after compilation
-    bin_name = 'advanced_https_ota.bin'
-    # Modified firmware image to lower security version in its header. This is to enable negative test case
-    anti_rollback_bin_name = 'advanced_https_ota_lower_sec_version.bin'
-    # check and log bin size
-    binary_file = os.path.join(dut.app.binary_path, bin_name)
-    file_size = os.path.getsize(binary_file)
-    with open(binary_file, 'rb+') as f:
-        with open(os.path.join(dut.app.binary_path, anti_rollback_bin_name), 'wb+') as fo:
-            fo.write(f.read(file_size))
-            # Change security_version to 0 for negative test case
-            fo.seek(36)
-            fo.write(b'\x00')
-    binary_file = os.path.join(dut.app.binary_path, anti_rollback_bin_name)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        dut.serial.erase_flash()
+        dut.serial.flash()
+        server_port = 8001
+        # Original binary file generated after compilation
+        bin_name = 'advanced_https_ota.bin'
+        # Modified firmware image to lower security version in its header. This is to enable negative test case
+        anti_rollback_bin_name = 'advanced_https_ota_lower_sec_version.bin'
+        # check and log bin size
+        binary_file = os.path.join(dut.app.binary_path, bin_name)
+        file_size = os.path.getsize(binary_file)
+        with open(binary_file, 'rb+') as f:
+            with open(os.path.join(dut.app.binary_path, anti_rollback_bin_name), 'wb+') as fo:
+                fo.write(f.read(file_size))
+                # Change security_version to 0 for negative test case
+                fo.seek(36)
+                fo.write(b'\x00')
+        binary_file = os.path.join(dut.app.binary_path, anti_rollback_bin_name)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    # Positive Case
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        # Positive Case
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting Advanced OTA example', timeout=30)
+
+        # Use originally generated image with secure_version=1
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+        dut.expect(r'App is valid, rollback cancelled successfully', timeout=30)
+
+        # Negative Case
+        dut.expect('Starting Advanced OTA example', timeout=30)
+        # Use modified image with secure_version=0
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name)
+        dut.expect('New firmware security version is less than eFuse programmed, 0 < 1', timeout=30)
+        try:
+            os.remove(binary_file)
+        except OSError:
+            pass
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
-
-    # Use originally generated image with secure_version=1
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-    dut.expect(r'App is valid, rollback cancelled successfully', timeout=30)
-
-    # Negative Case
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    # Use modified image with secure_version=0
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name)
-    dut.expect('New firmware security version is less than eFuse programmed, 0 < 1', timeout=30)
-    try:
-        os.remove(binary_file)
-    except OSError:
-        pass
-    thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -537,37 +538,38 @@ def test_examples_protocol_advanced_https_ota_example_partial_request(dut: Dut)
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    server_port = 8001
-    # Size of partial HTTP request
-    request_size = 16384
-    # File to be downloaded. This file is generated after compilation
-    bin_name = 'advanced_https_ota.bin'
-    binary_file = os.path.join(dut.app.binary_path, bin_name)
-    bin_size = os.path.getsize(binary_file)
-    http_requests = int((bin_size / request_size) - 1)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        # Size of partial HTTP request
+        request_size = 16384
+        # File to be downloaded. This file is generated after compilation
+        bin_name = 'advanced_https_ota.bin'
+        binary_file = os.path.join(dut.app.binary_path, bin_name)
+        bin_size = os.path.getsize(binary_file)
+        http_requests = int((bin_size / request_size) - 1)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        print('ENV_TEST_FAILURE: Cannot connect to AP')
-        thread1.terminate()
-        raise
-    dut.expect('Starting Advanced OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            print('ENV_TEST_FAILURE: Cannot connect to AP')
+            thread1.terminate()
+            raise
+        dut.expect('Starting Advanced OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
-    for _ in range(http_requests):
-        dut.expect('Connection closed', timeout=60)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
+        for _ in range(http_requests):
+            dut.expect('Connection closed', timeout=60)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect('Starting Advanced OTA example', timeout=30)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -586,31 +588,32 @@ def test_examples_protocol_advanced_https_ota_example_nimble_gatts(dut: Dut) ->
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Reboot with the new OTA image
       4. Reboot with the new OTA image
     """
     """
-    server_port = 8001
-    # File to be downloaded. This file is generated after compilation
-    bin_name = 'advanced_https_ota.bin'
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        # File to be downloaded. This file is generated after compilation
+        bin_name = 'advanced_https_ota.bin'
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
 
 
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
-    print('Started GAP advertising.')
+        dut.expect('Starting Advanced OTA example', timeout=30)
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
+        print('Started GAP advertising.')
 
 
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    thread1.terminate()
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect('Starting Advanced OTA example', timeout=30)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -629,32 +632,33 @@ def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(dut: Dut)
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Reboot with the new OTA image
       4. Reboot with the new OTA image
     """
     """
-    server_port = 8001
-    # File to be downloaded. This file is generated after compilation
-    bin_name = 'advanced_https_ota.bin'
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        # File to be downloaded. This file is generated after compilation
+        bin_name = 'advanced_https_ota.bin'
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
 
 
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
-    dut.expect('Started advertising.', timeout=30)
-    print('Started GAP advertising.')
+        dut.expect('Starting Advanced OTA example', timeout=30)
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
+        dut.expect('Started advertising.', timeout=30)
+        print('Started GAP advertising.')
 
 
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    thread1.terminate()
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect('Starting Advanced OTA example', timeout=30)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -688,20 +692,23 @@ def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(dut: D
     # start test
     # start test
     host_ip = get_my_ip()
     host_ip = get_my_ip()
     chunked_server = start_chunked_server(dut.app.binary_path, 8070)
     chunked_server = start_chunked_server(dut.app.binary_path, 8070)
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Advanced OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':8070/' + aligned_bin_name))
-    dut.write('https://' + host_ip + ':8070/' + aligned_bin_name)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect('Starting Advanced OTA example', timeout=30)
-    chunked_server.kill()
     try:
     try:
-        os.remove(aligned_bin_name)
-    except OSError:
-        pass
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+
+        dut.expect('Starting Advanced OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8070/' + aligned_bin_name))
+        dut.write('https://' + host_ip + ':8070/' + aligned_bin_name)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect('Starting Advanced OTA example', timeout=30)
+        try:
+            os.remove(aligned_bin_name)
+        except OSError:
+            pass
+    finally:
+        chunked_server.kill()

+ 134 - 137
examples/system/ota/native_ota_example/pytest_native_ota.py

@@ -74,15 +74,6 @@ def get_my_ip() -> str:
     return my_ip
     return my_ip
 
 
 
 
-def get_server_status(host_ip: str, port: int) -> bool:
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    server_status = sock.connect_ex((host_ip, port))
-    sock.close()
-    if server_status == 0:
-        return True
-    return False
-
-
 def create_file(server_file: str, file_data: str) -> None:
 def create_file(server_file: str, file_data: str) -> None:
     with open(server_file, 'w+') as file:
     with open(server_file, 'w+') as file:
         file.write(file_data)
         file.write(file_data)
@@ -149,30 +140,31 @@ def test_examples_protocol_native_ota_example(dut: Dut) -> None:
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    server_port = 8002
-    # No. of times working of application to be validated
-    iterations = 3
-    # File to be downloaded. This file is generated after compilation
-    bin_name = 'native_ota.bin'
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8002
+        # No. of times working of application to be validated
+        iterations = 3
+        # File to be downloaded. This file is generated after compilation
+        bin_name = 'native_ota.bin'
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    for i in range(iterations):
-        dut.expect('Loaded app from partition at offset', timeout=60)
-        try:
-            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-            print('Connected to AP with IP: {}'.format(ip_address))
-        except pexpect.exceptions.TIMEOUT:
-            thread1.terminate()
-            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-        dut.expect('Starting OTA example', timeout=30)
-
-        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
-        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
-    thread1.terminate()
+        for i in range(iterations):
+            dut.expect('Loaded app from partition at offset', timeout=60)
+            try:
+                ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+                print('Connected to AP with IP: {}'.format(ip_address))
+            except pexpect.exceptions.TIMEOUT:
+                thread1.terminate()
+                raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+            dut.expect('Starting OTA example', timeout=30)
+
+            print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
+            dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.supported_targets
 @pytest.mark.supported_targets
@@ -187,42 +179,43 @@ def test_examples_protocol_native_ota_example_truncated_bin(dut: Dut) -> None:
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of code if bin is truncated
       4. Check working of code if bin is truncated
     """
     """
-    server_port = 8002
-    # Original binary file generated after compilation
-    bin_name = 'native_ota.bin'
-    # Truncated binary file to be generated from original binary file
-    truncated_bin_name = 'truncated.bin'
-    # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
-    # truncated_bin_size is set to 64000 to reduce consumed by the test case
-    truncated_bin_size = 64000
-    # check and log bin size
-    binary_file = os.path.join(dut.app.binary_path, bin_name)
-    f = open(binary_file, 'rb+')
-    fo = open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+')
-    fo.write(f.read(truncated_bin_size))
-    fo.close()
-    f.close()
-    binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8002
+        # Original binary file generated after compilation
+        bin_name = 'native_ota.bin'
+        # Truncated binary file to be generated from original binary file
+        truncated_bin_name = 'truncated.bin'
+        # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
+        # truncated_bin_size is set to 64000 to reduce consumed by the test case
+        truncated_bin_size = 64000
+        # check and log bin size
+        binary_file = os.path.join(dut.app.binary_path, bin_name)
+        f = open(binary_file, 'rb+')
+        fo = open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+')
+        fo.write(f.read(truncated_bin_size))
+        fo.close()
+        f.close()
+        binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
-    dut.expect('native_ota_example: Image validation failed, image is corrupted', timeout=20)
-    os.remove(binary_file)
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
+        dut.expect('native_ota_example: Image validation failed, image is corrupted', timeout=20)
+        os.remove(binary_file)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.supported_targets
 @pytest.mark.supported_targets
@@ -237,41 +230,42 @@ def test_examples_protocol_native_ota_example_truncated_header(dut: Dut) -> None
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of code if headers are not sent completely
       4. Check working of code if headers are not sent completely
     """
     """
-    server_port = 8002
-    # Original binary file generated after compilation
-    bin_name = 'native_ota.bin'
-    # Truncated binary file to be generated from original binary file
-    truncated_bin_name = 'truncated_header.bin'
-    # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
-    truncated_bin_size = 180
-    # check and log bin size
-    binary_file = os.path.join(dut.app.binary_path, bin_name)
-    f = open(binary_file, 'rb+')
-    fo = open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+')
-    fo.write(f.read(truncated_bin_size))
-    fo.close()
-    f.close()
-    binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8002
+        # Original binary file generated after compilation
+        bin_name = 'native_ota.bin'
+        # Truncated binary file to be generated from original binary file
+        truncated_bin_name = 'truncated_header.bin'
+        # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
+        truncated_bin_size = 180
+        # check and log bin size
+        binary_file = os.path.join(dut.app.binary_path, bin_name)
+        f = open(binary_file, 'rb+')
+        fo = open(os.path.join(dut.app.binary_path, truncated_bin_name), 'wb+')
+        fo.write(f.read(truncated_bin_size))
+        fo.close()
+        f.close()
+        binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
-    dut.expect('native_ota_example: received package is not fit len', timeout=20)
-    os.remove(binary_file)
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
+        dut.expect('native_ota_example: received package is not fit len', timeout=20)
+        os.remove(binary_file)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.supported_targets
 @pytest.mark.supported_targets
@@ -286,40 +280,41 @@ def test_examples_protocol_native_ota_example_random(dut: Dut) -> None:
       3. Fetch OTA image over HTTPS
       3. Fetch OTA image over HTTPS
       4. Check working of code for random binary file
       4. Check working of code for random binary file
     """
     """
-    server_port = 8002
-    # Random binary file to be generated
-    random_bin_name = 'random.bin'
-    # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
-    random_bin_size = 32000
-    # check and log bin size
-    binary_file = os.path.join(dut.app.binary_path, random_bin_name)
-    fo = open(binary_file, 'wb+')
-    # First byte of binary file is always set to zero. If first byte is generated randomly,
-    # in some cases it may generate 0xE9 which will result in failure of testcase.
-    fo.write(struct.pack('B', 0))
-    for i in range(random_bin_size - 1):
-        fo.write(struct.pack('B', random.randrange(0,255,1)))
-    fo.close()
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8002
+        # Random binary file to be generated
+        random_bin_name = 'random.bin'
+        # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
+        random_bin_size = 32000
+        # check and log bin size
+        binary_file = os.path.join(dut.app.binary_path, random_bin_name)
+        fo = open(binary_file, 'wb+')
+        # First byte of binary file is always set to zero. If first byte is generated randomly,
+        # in some cases it may generate 0xE9 which will result in failure of testcase.
+        fo.write(struct.pack('B', 0))
+        for i in range(random_bin_size - 1):
+            fo.write(struct.pack('B', random.randrange(0,255,1)))
+        fo.close()
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
 
 
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
-    dut.expect('esp_ota_ops: OTA image has invalid magic byte', timeout=20)
-    os.remove(binary_file)
-    thread1.terminate()
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
+        dut.expect('esp_ota_ops: OTA image has invalid magic byte', timeout=20)
+        os.remove(binary_file)
+    finally:
+        thread1.terminate()
 
 
 
 
 @pytest.mark.supported_targets
 @pytest.mark.supported_targets
@@ -338,18 +333,20 @@ def test_examples_protocol_native_ota_example_chunked(dut: Dut) -> None:
     # start test
     # start test
     host_ip = get_my_ip()
     host_ip = get_my_ip()
     chunked_server = start_chunked_server(dut.app.binary_path, 8070)
     chunked_server = start_chunked_server(dut.app.binary_path, 8070)
-    dut.expect('Loaded app from partition at offset', timeout=30)
     try:
     try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-
-    dut.expect('Starting OTA example', timeout=30)
-    print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
-    dut.write('https://' + host_ip + ':8070/' + bin_name)
-    dut.expect('Loaded app from partition at offset', timeout=60)
-    dut.expect('Starting OTA example', timeout=30)
-    chunked_server.kill()
-    os.remove(os.path.join(dut.app.binary_path, 'server_cert.pem'))
-    os.remove(os.path.join(dut.app.binary_path, 'server_key.pem'))
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+
+        dut.expect('Starting OTA example', timeout=30)
+        print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
+        dut.write('https://' + host_ip + ':8070/' + bin_name)
+        dut.expect('Loaded app from partition at offset', timeout=60)
+        dut.expect('Starting OTA example', timeout=30)
+        os.remove(os.path.join(dut.app.binary_path, 'server_cert.pem'))
+        os.remove(os.path.join(dut.app.binary_path, 'server_key.pem'))
+    finally:
+        chunked_server.kill()

+ 19 - 25
examples/system/ota/pre_encrypted_ota/pytest_pre_encrypted_ota.py

@@ -26,13 +26,6 @@ def get_my_ip() -> str:
     return my_ip
     return my_ip
 
 
 
 
-def get_server_status(host_ip: str, port: int) -> bool:
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    server_status = sock.connect_ex((host_ip, port))
-    sock.close()
-    return server_status == 0
-
-
 def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
 def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
     """
     """
     Returns a request handler class that handles broken pipe exception
     Returns a request handler class that handles broken pipe exception
@@ -73,26 +66,27 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) ->
 @pytest.mark.esp32s3
 @pytest.mark.esp32s3
 @pytest.mark.ethernet_ota
 @pytest.mark.ethernet_ota
 def test_examples_protocol_pre_encrypted_ota_example(dut: Dut) -> None:
 def test_examples_protocol_pre_encrypted_ota_example(dut: Dut) -> None:
-    server_port = 8001
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, server_port) is False):
+    try:
+        server_port = 8001
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, server_port))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
 
 
-    dut.expect('Loaded app from partition at offset', timeout=30)
-    try:
-        ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        dut.expect('Loaded app from partition at offset', timeout=30)
+        try:
+            ip_address = dut.expect(r' (sta|eth) ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting Pre Encrypted OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + enc_bin_name))
+        dut.write('https://' + host_ip + ':' + str(server_port) + '/' + enc_bin_name)
+        dut.expect('Magic Verified', timeout=30)
+        dut.expect('Reading RSA private key', timeout=30)
+        dut.expect('upgrade successful. Rebooting', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting Pre Encrypted OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + enc_bin_name))
-    dut.write('https://' + host_ip + ':' + str(server_port) + '/' + enc_bin_name)
-    dut.expect('Magic Verified', timeout=30)
-    dut.expect('Reading RSA private key', timeout=30)
-    dut.expect('upgrade successful. Rebooting', timeout=30)
-    thread1.terminate()

+ 132 - 135
examples/system/ota/simple_ota_example/pytest_simple_ota.py

@@ -72,15 +72,6 @@ def get_my_ip() -> str:
     return my_ip
     return my_ip
 
 
 
 
-def get_server_status(host_ip: str, server_port: int) -> bool:
-    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-    server_status = sock.connect_ex((host_ip, server_port))
-    sock.close()
-    if server_status == 0:
-        return True
-    return False
-
-
 def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, server_file: str = None, key_file: str = None) -> None:
 def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, server_file: str = None, key_file: str = None) -> None:
     os.chdir(ota_image_dir)
     os.chdir(ota_image_dir)
 
 
@@ -136,29 +127,30 @@ def test_examples_protocol_simple_ota_example(dut: Dut) -> None:
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    sha256_bootloader, sha256_app = calc_all_sha256(dut)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, 8000) is False):
+    try:
+        sha256_bootloader, sha256_app = calc_all_sha256(dut)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset 0x10000', timeout=30)
-    check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
-    check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
-    try:
-        ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        dut.expect('Loaded app from partition at offset 0x10000', timeout=30)
+        check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
+        check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
+        try:
+            ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
+        dut.write('https://' + host_ip + ':8000/simple_ota.bin')
+        dut.expect('Loaded app from partition at offset 0x110000', timeout=60)
+        dut.expect('Starting OTA example', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
-    dut.write('https://' + host_ip + ':8000/simple_ota.bin')
-    dut.expect('Loaded app from partition at offset 0x110000', timeout=60)
-    dut.expect('Starting OTA example', timeout=30)
-    thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -174,26 +166,27 @@ def test_examples_protocol_simple_ota_example_ethernet_with_spiram_config(dut: D
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, 8000) is False):
+    try:
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset 0x10000', timeout=30)
-    try:
-        ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        dut.expect('Loaded app from partition at offset 0x10000', timeout=30)
+        try:
+            ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
+        dut.write('https://' + host_ip + ':8000/simple_ota.bin')
+        dut.expect('Loaded app from partition at offset 0x110000', timeout=60)
+        dut.expect('Starting OTA example', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
-    dut.write('https://' + host_ip + ':8000/simple_ota.bin')
-    dut.expect('Loaded app from partition at offset 0x110000', timeout=60)
-    dut.expect('Starting OTA example', timeout=30)
-    thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -210,31 +203,32 @@ def test_examples_protocol_simple_ota_example_with_flash_encryption(dut: Dut) ->
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    # Erase flash
-    dut.serial.erase_flash()
-    dut.serial.flash()
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, 8000) is False):
+    try:
+        # Erase flash
+        dut.serial.erase_flash()
+        dut.serial.flash()
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
-    dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
-    try:
-        ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
+        dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
+        try:
+            ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
+        dut.write('https://' + host_ip + ':8000/simple_ota.bin')
+        dut.expect('Loaded app from partition at offset 0x120000', timeout=60)
+        dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
+        dut.expect('Starting OTA example', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
-    dut.write('https://' + host_ip + ':8000/simple_ota.bin')
-    dut.expect('Loaded app from partition at offset 0x120000', timeout=60)
-    dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
-    dut.expect('Starting OTA example', timeout=30)
-    thread1.terminate()
 
 
 
 
 @pytest.mark.esp32c3
 @pytest.mark.esp32c3
@@ -249,31 +243,32 @@ def test_examples_protocol_simple_ota_example_with_flash_encryption_wifi(dut: Du
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    # start test
-    # Erase flash
-    dut.serial.erase_flash()
-    dut.serial.flash()
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, 8000) is False):
+    try:
+        # start test
+        # Erase flash
+        dut.serial.erase_flash()
+        dut.serial.flash()
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
-    dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
-    try:
-        ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
+        dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
+        try:
+            ip_address = dut.expect(r' sta ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
+        dut.write('https://' + host_ip + ':8000/simple_ota.bin')
+        dut.expect('Loaded app from partition at offset 0x120000', timeout=60)
+        dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
+        dut.expect('Starting OTA example', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
-    dut.write('https://' + host_ip + ':8000/simple_ota.bin')
-    dut.expect('Loaded app from partition at offset 0x120000', timeout=60)
-    dut.expect('Flash encryption mode is DEVELOPMENT', timeout=10)
-    dut.expect('Starting OTA example', timeout=30)
-    thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -289,33 +284,34 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    sha256_bootloader, sha256_app = calc_all_sha256(dut)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, 8000) is False):
+    try:
+        sha256_bootloader, sha256_app = calc_all_sha256(dut)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
-    check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
-    check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
-    try:
-        ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
+        check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
+        check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
+        try:
+            ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
+        dut.write('https://' + host_ip + ':8000/simple_ota.bin')
+        dut.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
+
+        dut.expect('Verifying image signature...', timeout=60)
+
+        dut.expect('Loaded app from partition at offset 0x120000', timeout=20)
+        dut.expect('Starting OTA example', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
-    dut.write('https://' + host_ip + ':8000/simple_ota.bin')
-    dut.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
-
-    dut.expect('Verifying image signature...', timeout=60)
-
-    dut.expect('Loaded app from partition at offset 0x120000', timeout=20)
-    dut.expect('Starting OTA example', timeout=30)
-    thread1.terminate()
 
 
 
 
 @pytest.mark.esp32
 @pytest.mark.esp32
@@ -331,36 +327,37 @@ def test_examples_protocol_simple_ota_example_with_verify_app_signature_on_updat
       2. Fetch OTA image over HTTPS
       2. Fetch OTA image over HTTPS
       3. Reboot with the new OTA image
       3. Reboot with the new OTA image
     """
     """
-    sha256_bootloader, sha256_app = calc_all_sha256(dut)
-    # start test
-    host_ip = get_my_ip()
-    if (get_server_status(host_ip, 8000) is False):
+    try:
+        sha256_bootloader, sha256_app = calc_all_sha256(dut)
+        # start test
+        host_ip = get_my_ip()
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, host_ip, 8000))
         thread1.daemon = True
         thread1.daemon = True
         thread1.start()
         thread1.start()
-    dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
-    check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
-    check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
-    try:
-        ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
-        print('Connected to AP with IP: {}'.format(ip_address))
-    except pexpect.exceptions.TIMEOUT:
+        dut.expect('Loaded app from partition at offset 0x20000', timeout=30)
+        check_sha256(sha256_bootloader, str(dut.expect(r'SHA-256 for bootloader:\s+([a-f0-9]){64}')[0]))
+        check_sha256(sha256_app, str(dut.expect(r'SHA-256 for current firmware:\s+([a-f0-9]){64}')[0]))
+        try:
+            ip_address = dut.expect(r' eth ip: ([^,]+),', timeout=30)
+            print('Connected to AP with IP: {}'.format(ip_address))
+        except pexpect.exceptions.TIMEOUT:
+            thread1.terminate()
+            raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
+        dut.expect('Starting OTA example', timeout=30)
+
+        print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
+        dut.write('https://' + host_ip + ':8000/simple_ota.bin')
+        dut.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
+
+        dut.expect('Verifying image signature...', timeout=60)
+        dut.expect('#0 app key digest == #0 trusted key digest', timeout=10)
+        dut.expect('Verifying with RSA-PSS...', timeout=10)
+        dut.expect('Signature verified successfully!', timeout=10)
+
+        dut.expect('Loaded app from partition at offset 0x120000', timeout=20)
+        dut.expect('Starting OTA example', timeout=30)
+    finally:
         thread1.terminate()
         thread1.terminate()
-        raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
-    dut.expect('Starting OTA example', timeout=30)
-
-    print('writing to device: {}'.format('https://' + host_ip + ':8000/simple_ota.bin'))
-    dut.write('https://' + host_ip + ':8000/simple_ota.bin')
-    dut.expect('Writing to partition subtype 16 at offset 0x120000', timeout=20)
-
-    dut.expect('Verifying image signature...', timeout=60)
-    dut.expect('#0 app key digest == #0 trusted key digest', timeout=10)
-    dut.expect('Verifying with RSA-PSS...', timeout=10)
-    dut.expect('Signature verified successfully!', timeout=10)
-
-    dut.expect('Loaded app from partition at offset 0x120000', timeout=20)
-    dut.expect('Starting OTA example', timeout=30)
-    thread1.terminate()
 
 
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':