pytest_native_ota.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Unlicense OR CC0-1.0
  3. import http.server
  4. import multiprocessing
  5. import os
  6. import random
  7. import socket
  8. import ssl
  9. import struct
  10. import subprocess
  11. from typing import Callable, Tuple
  12. import pexpect
  13. import pytest
  14. from common_test_methods import get_host_ip4_by_dest_ip
  15. from pytest_embedded import Dut
  16. server_cert = '-----BEGIN CERTIFICATE-----\n' \
  17. 'MIIDWDCCAkACCQCbF4+gVh/MLjANBgkqhkiG9w0BAQsFADBuMQswCQYDVQQGEwJJ\n'\
  18. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  19. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  20. 'b20wHhcNMjEwNzEyMTIzNjI3WhcNNDEwNzA3MTIzNjI3WjBuMQswCQYDVQQGEwJJ\n'\
  21. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  22. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  23. 'b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDhxF/y7bygndxPwiWL\n'\
  24. 'SwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQuc32W\n'\
  25. 'ukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2mKRbQ\n'\
  26. 'S5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO2fEz\n'\
  27. 'YaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnvL6Oz\n'\
  28. '3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdOAoap\n'\
  29. 'rFTRAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAItw24y565k3C/zENZlxyzto44ud\n'\
  30. 'IYPQXN8Fa2pBlLe1zlSIyuaA/rWQ+i1daS8nPotkCbWZyf5N8DYaTE4B0OfvoUPk\n'\
  31. 'B5uGDmbuk6akvlB5BGiYLfQjWHRsK9/4xjtIqN1H58yf3QNROuKsPAeywWS3Fn32\n'\
  32. '3//OpbWaClQePx6udRYMqAitKR+QxL7/BKZQsX+UyShuq8hjphvXvk0BW8ONzuw9\n'\
  33. 'RcoORxM0FzySYjeQvm4LhzC/P3ZBhEq0xs55aL2a76SJhq5hJy7T/Xz6NFByvlrN\n'\
  34. 'lFJJey33KFrAf5vnV9qcyWFIo7PYy2VsaaEjFeefr7q3sTFSMlJeadexW2Y=\n'\
  35. '-----END CERTIFICATE-----\n'
  36. server_key = '-----BEGIN PRIVATE KEY-----\n'\
  37. 'MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDhxF/y7bygndxP\n'\
  38. 'wiWLSwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQu\n'\
  39. 'c32WukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2m\n'\
  40. 'KRbQS5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO\n'\
  41. '2fEzYaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnv\n'\
  42. 'L6Oz3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdO\n'\
  43. 'AoaprFTRAgMBAAECggEAE0HCxV/N1Q1h+1OeDDGL5+74yjKSFKyb/vTVcaPCrmaH\n'\
  44. 'fPvp0ddOvMZJ4FDMAsiQS6/n4gQ7EKKEnYmwTqj4eUYW8yxGUn3f0YbPHbZT+Mkj\n'\
  45. 'z5woi3nMKi/MxCGDQZX4Ow3xUQlITUqibsfWcFHis8c4mTqdh4qj7xJzehD2PVYF\n'\
  46. 'gNHZsvVj6MltjBDAVwV1IlGoHjuElm6vuzkfX7phxcA1B4ZqdYY17yCXUnvui46z\n'\
  47. 'Xn2kUTOOUCEgfgvGa9E+l4OtdXi5IxjaSraU+dlg2KsE4TpCuN2MEVkeR5Ms3Y7Q\n'\
  48. 'jgJl8vlNFJDQpbFukLcYwG7rO5N5dQ6WWfVia/5XgQKBgQD74at/bXAPrh9NxPmz\n'\
  49. 'i1oqCHMDoM9sz8xIMZLF9YVu3Jf8ux4xVpRSnNy5RU1gl7ZXbpdgeIQ4v04zy5aw\n'\
  50. '8T4tu9K3XnR3UXOy25AK0q+cnnxZg3kFQm+PhtOCKEFjPHrgo2MUfnj+EDddod7N\n'\
  51. 'JQr9q5rEFbqHupFPpWlqCa3QmQKBgQDldWUGokNaEpmgHDMnHxiibXV5LQhzf8Rq\n'\
  52. 'gJIQXb7R9EsTSXEvsDyqTBb7PHp2Ko7rZ5YQfyf8OogGGjGElnPoU/a+Jij1gVFv\n'\
  53. 'kZ064uXAAISBkwHdcuobqc5EbG3ceyH46F+FBFhqM8KcbxJxx08objmh58+83InN\n'\
  54. 'P9Qr25Xw+QKBgEGXMHuMWgQbSZeM1aFFhoMvlBO7yogBTKb4Ecpu9wI5e3Kan3Al\n'\
  55. 'pZYltuyf+VhP6XG3IMBEYdoNJyYhu+nzyEdMg8CwXg+8LC7FMis/Ve+o7aS5scgG\n'\
  56. '1to/N9DK/swCsdTRdzmc/ZDbVC+TuVsebFBGYZTyO5KgqLpezqaIQrTxAoGALFCU\n'\
  57. '10glO9MVyl9H3clap5v+MQ3qcOv/EhaMnw6L2N6WVT481tnxjW4ujgzrFcE4YuxZ\n'\
  58. 'hgwYu9TOCmeqopGwBvGYWLbj+C4mfSahOAs0FfXDoYazuIIGBpuv03UhbpB1Si4O\n'\
  59. 'rJDfRnuCnVWyOTkl54gKJ2OusinhjztBjcrV1XkCgYEA3qNi4uBsPdyz9BZGb/3G\n'\
  60. 'rOMSw0CaT4pEMTLZqURmDP/0hxvTk1polP7O/FYwxVuJnBb6mzDa0xpLFPTpIAnJ\n'\
  61. 'YXB8xpXU69QVh+EBbemdJWOd+zp5UCfXvb2shAeG3Tn/Dz4cBBMEUutbzP+or0nG\n'\
  62. 'vSXnRLaxQhooWm+IuX9SuBQ=\n'\
  63. '-----END PRIVATE KEY-----\n'
  64. def create_file(server_file: str, file_data: str) -> None:
  65. with open(server_file, 'w+') as file:
  66. file.write(file_data)
  67. def get_ca_cert(ota_image_dir: str) -> Tuple[str, str]:
  68. os.chdir(ota_image_dir)
  69. server_file = os.path.join(ota_image_dir, 'server_cert.pem')
  70. create_file(server_file, server_cert)
  71. key_file = os.path.join(ota_image_dir, 'server_key.pem')
  72. create_file(key_file, server_key)
  73. return server_file, key_file
  74. def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
  75. """
  76. Returns a request handler class that handles broken pipe exception
  77. """
  78. class RequestHandler(http.server.SimpleHTTPRequestHandler):
  79. def finish(self) -> None:
  80. try:
  81. if not self.wfile.closed:
  82. self.wfile.flush()
  83. self.wfile.close()
  84. except socket.error:
  85. pass
  86. self.rfile.close()
  87. def handle(self) -> None:
  88. try:
  89. http.server.BaseHTTPRequestHandler.handle(self)
  90. except socket.error:
  91. pass
  92. return RequestHandler
  93. def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> None:
  94. server_file, key_file = get_ca_cert(ota_image_dir)
  95. requestHandler = https_request_handler()
  96. httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
  97. httpd.socket = ssl.wrap_socket(httpd.socket,
  98. keyfile=key_file,
  99. certfile=server_file, server_side=True)
  100. httpd.serve_forever()
  101. def start_chunked_server(ota_image_dir: str, server_port: int) -> subprocess.Popen:
  102. server_file, key_file = get_ca_cert(ota_image_dir)
  103. chunked_server = subprocess.Popen(['openssl', 's_server', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
  104. return chunked_server
  105. @pytest.mark.esp32
  106. @pytest.mark.esp32c3
  107. @pytest.mark.esp32s2
  108. @pytest.mark.esp32s3
  109. @pytest.mark.esp32c2
  110. @pytest.mark.ethernet_ota
  111. def test_examples_protocol_native_ota_example(dut: Dut) -> None:
  112. """
  113. This is a positive test case, which downloads complete binary file multiple number of times.
  114. Number of iterations can be specified in variable iterations.
  115. steps: |
  116. 1. join AP/Ethernet
  117. 2. Fetch OTA image over HTTPS
  118. 3. Reboot with the new OTA image
  119. """
  120. server_port = 8002
  121. # No. of times working of application to be validated
  122. iterations = 3
  123. # File to be downloaded. This file is generated after compilation
  124. bin_name = 'native_ota.bin'
  125. # Start server
  126. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  127. thread1.daemon = True
  128. thread1.start()
  129. try:
  130. # start test
  131. for _ in range(iterations):
  132. dut.expect('Loaded app from partition at offset', timeout=30)
  133. try:
  134. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  135. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  136. except pexpect.exceptions.TIMEOUT:
  137. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  138. host_ip = get_host_ip4_by_dest_ip(ip_address)
  139. dut.expect('Starting OTA example task', timeout=30)
  140. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  141. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  142. dut.expect('Prepare to restart system!', timeout=60)
  143. finally:
  144. thread1.terminate()
  145. @pytest.mark.esp32
  146. @pytest.mark.esp32c3
  147. @pytest.mark.esp32s2
  148. @pytest.mark.esp32s3
  149. @pytest.mark.esp32c2
  150. @pytest.mark.ethernet_ota
  151. def test_examples_protocol_native_ota_example_truncated_bin(dut: Dut) -> None:
  152. """
  153. Working of OTA if binary file is truncated is validated in this test case.
  154. Application should return with error message in this case.
  155. steps: |
  156. 1. join AP/Ethernet
  157. 2. Generate truncated binary file
  158. 3. Fetch OTA image over HTTPS
  159. 4. Check working of code if bin is truncated
  160. """
  161. server_port = 8002
  162. # Original binary file generated after compilation
  163. bin_name = 'native_ota.bin'
  164. # Truncated binary file to be generated from original binary file
  165. truncated_bin_name = 'truncated.bin'
  166. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  167. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  168. truncated_bin_size = 64000
  169. # check and log bin size
  170. binary_file = os.path.join(dut.app.binary_path, bin_name)
  171. with open(binary_file, 'rb+') as fr:
  172. bin_data = fr.read(truncated_bin_size)
  173. binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
  174. with open(binary_file, 'wb+') as fo:
  175. fo.write(bin_data)
  176. # Start server
  177. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  178. thread1.daemon = True
  179. thread1.start()
  180. try:
  181. # start test
  182. dut.expect('Loaded app from partition at offset', timeout=30)
  183. try:
  184. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  185. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  186. except pexpect.exceptions.TIMEOUT:
  187. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  188. host_ip = get_host_ip4_by_dest_ip(ip_address)
  189. dut.expect('Starting OTA example task', timeout=30)
  190. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  191. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  192. dut.expect('native_ota_example: Image validation failed, image is corrupted', timeout=20)
  193. os.remove(binary_file)
  194. finally:
  195. thread1.terminate()
  196. @pytest.mark.esp32
  197. @pytest.mark.esp32c3
  198. @pytest.mark.esp32s2
  199. @pytest.mark.esp32s3
  200. @pytest.mark.esp32c2
  201. @pytest.mark.ethernet_ota
  202. def test_examples_protocol_native_ota_example_truncated_header(dut: Dut) -> None:
  203. """
  204. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  205. Application should return with error message in this case.
  206. steps: |
  207. 1. join AP/Ethernet
  208. 2. Generate binary file with truncated headers
  209. 3. Fetch OTA image over HTTPS
  210. 4. Check working of code if headers are not sent completely
  211. """
  212. server_port = 8002
  213. # Original binary file generated after compilation
  214. bin_name = 'native_ota.bin'
  215. # Truncated binary file to be generated from original binary file
  216. truncated_bin_name = 'truncated_header.bin'
  217. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  218. truncated_bin_size = 180
  219. # check and log bin size
  220. binary_file = os.path.join(dut.app.binary_path, bin_name)
  221. with open(binary_file, 'rb+') as fr:
  222. bin_data = fr.read(truncated_bin_size)
  223. binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
  224. with open(binary_file, 'wb+') as fo:
  225. fo.write(bin_data)
  226. # Start server
  227. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  228. thread1.daemon = True
  229. thread1.start()
  230. try:
  231. # start test
  232. dut.expect('Loaded app from partition at offset', timeout=30)
  233. try:
  234. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  235. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  236. except pexpect.exceptions.TIMEOUT:
  237. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  238. host_ip = get_host_ip4_by_dest_ip(ip_address)
  239. dut.expect('Starting OTA example task', timeout=30)
  240. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  241. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  242. dut.expect('native_ota_example: received package is not fit len', timeout=20)
  243. os.remove(binary_file)
  244. finally:
  245. thread1.terminate()
  246. @pytest.mark.esp32
  247. @pytest.mark.esp32c3
  248. @pytest.mark.esp32s2
  249. @pytest.mark.esp32s3
  250. @pytest.mark.esp32c2
  251. @pytest.mark.ethernet_ota
  252. def test_examples_protocol_native_ota_example_random(dut: Dut) -> None:
  253. """
  254. Working of OTA if random data is added in binary file are validated in this test case.
  255. Magic byte verification should fail in this case.
  256. steps: |
  257. 1. join AP/Ethernet
  258. 2. Generate random binary image
  259. 3. Fetch OTA image over HTTPS
  260. 4. Check working of code for random binary file
  261. """
  262. server_port = 8002
  263. # Random binary file to be generated
  264. random_bin_name = 'random.bin'
  265. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  266. random_bin_size = 32000
  267. # check and log bin size
  268. binary_file = os.path.join(dut.app.binary_path, random_bin_name)
  269. fo = open(binary_file, 'wb+')
  270. # First byte of binary file is always set to zero. If first byte is generated randomly,
  271. # in some cases it may generate 0xE9 which will result in failure of testcase.
  272. with open(binary_file, 'wb+') as fo:
  273. fo.write(struct.pack('B', 0))
  274. for _ in range(random_bin_size - 1):
  275. fo.write(struct.pack('B', random.randrange(0,255,1)))
  276. # Start server
  277. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  278. thread1.daemon = True
  279. thread1.start()
  280. try:
  281. # start test
  282. dut.expect('Loaded app from partition at offset', timeout=30)
  283. try:
  284. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  285. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  286. except pexpect.exceptions.TIMEOUT:
  287. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  288. host_ip = get_host_ip4_by_dest_ip(ip_address)
  289. dut.expect('Starting OTA example task', timeout=30)
  290. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
  291. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
  292. dut.expect('esp_ota_ops: OTA image has invalid magic byte', timeout=20)
  293. os.remove(binary_file)
  294. finally:
  295. thread1.terminate()
  296. @pytest.mark.esp32
  297. @pytest.mark.esp32c3
  298. @pytest.mark.esp32s2
  299. @pytest.mark.esp32s3
  300. @pytest.mark.esp32c2
  301. @pytest.mark.ethernet_ota
  302. def test_examples_protocol_native_ota_example_chunked(dut: Dut) -> None:
  303. """
  304. This is a positive test case, which downloads complete binary file multiple number of times.
  305. Number of iterations can be specified in variable iterations.
  306. steps: |
  307. 1. join AP/Ethernet
  308. 2. Fetch OTA image over HTTPS
  309. 3. Reboot with the new OTA image
  310. """
  311. # File to be downloaded. This file is generated after compilation
  312. bin_name = 'native_ota.bin'
  313. # Start server
  314. chunked_server = start_chunked_server(dut.app.binary_path, 8070)
  315. try:
  316. # start test
  317. dut.expect('Loaded app from partition at offset', timeout=30)
  318. try:
  319. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  320. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  321. except pexpect.exceptions.TIMEOUT:
  322. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  323. host_ip = get_host_ip4_by_dest_ip(ip_address)
  324. dut.expect('Starting OTA example task', timeout=30)
  325. print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
  326. dut.write('https://' + host_ip + ':8070/' + bin_name)
  327. dut.expect('Prepare to restart system!', timeout=60)
  328. # after reboot
  329. dut.expect('Loaded app from partition at offset', timeout=30)
  330. dut.expect('OTA example app_main start', timeout=10)
  331. os.remove(os.path.join(dut.app.binary_path, 'server_cert.pem'))
  332. os.remove(os.path.join(dut.app.binary_path, 'server_key.pem'))
  333. finally:
  334. chunked_server.kill()