pytest_native_ota.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Unlicense OR CC0-1.0
  3. import http.server
  4. import multiprocessing
  5. import os
  6. import random
  7. import socket
  8. import ssl
  9. import struct
  10. import subprocess
  11. from typing import Callable, Tuple
  12. import pexpect
  13. import pytest
  14. from common_test_methods import get_host_ip4_by_dest_ip
  15. from pytest_embedded import Dut
  16. server_cert = '-----BEGIN CERTIFICATE-----\n' \
  17. 'MIIDWDCCAkACCQCbF4+gVh/MLjANBgkqhkiG9w0BAQsFADBuMQswCQYDVQQGEwJJ\n'\
  18. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  19. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  20. 'b20wHhcNMjEwNzEyMTIzNjI3WhcNNDEwNzA3MTIzNjI3WjBuMQswCQYDVQQGEwJJ\n'\
  21. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  22. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  23. 'b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDhxF/y7bygndxPwiWL\n'\
  24. 'SwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQuc32W\n'\
  25. 'ukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2mKRbQ\n'\
  26. 'S5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO2fEz\n'\
  27. 'YaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnvL6Oz\n'\
  28. '3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdOAoap\n'\
  29. 'rFTRAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAItw24y565k3C/zENZlxyzto44ud\n'\
  30. 'IYPQXN8Fa2pBlLe1zlSIyuaA/rWQ+i1daS8nPotkCbWZyf5N8DYaTE4B0OfvoUPk\n'\
  31. 'B5uGDmbuk6akvlB5BGiYLfQjWHRsK9/4xjtIqN1H58yf3QNROuKsPAeywWS3Fn32\n'\
  32. '3//OpbWaClQePx6udRYMqAitKR+QxL7/BKZQsX+UyShuq8hjphvXvk0BW8ONzuw9\n'\
  33. 'RcoORxM0FzySYjeQvm4LhzC/P3ZBhEq0xs55aL2a76SJhq5hJy7T/Xz6NFByvlrN\n'\
  34. 'lFJJey33KFrAf5vnV9qcyWFIo7PYy2VsaaEjFeefr7q3sTFSMlJeadexW2Y=\n'\
  35. '-----END CERTIFICATE-----\n'
  36. server_key = '-----BEGIN PRIVATE KEY-----\n'\
  37. 'MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDhxF/y7bygndxP\n'\
  38. 'wiWLSwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQu\n'\
  39. 'c32WukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2m\n'\
  40. 'KRbQS5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO\n'\
  41. '2fEzYaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnv\n'\
  42. 'L6Oz3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdO\n'\
  43. 'AoaprFTRAgMBAAECggEAE0HCxV/N1Q1h+1OeDDGL5+74yjKSFKyb/vTVcaPCrmaH\n'\
  44. 'fPvp0ddOvMZJ4FDMAsiQS6/n4gQ7EKKEnYmwTqj4eUYW8yxGUn3f0YbPHbZT+Mkj\n'\
  45. 'z5woi3nMKi/MxCGDQZX4Ow3xUQlITUqibsfWcFHis8c4mTqdh4qj7xJzehD2PVYF\n'\
  46. 'gNHZsvVj6MltjBDAVwV1IlGoHjuElm6vuzkfX7phxcA1B4ZqdYY17yCXUnvui46z\n'\
  47. 'Xn2kUTOOUCEgfgvGa9E+l4OtdXi5IxjaSraU+dlg2KsE4TpCuN2MEVkeR5Ms3Y7Q\n'\
  48. 'jgJl8vlNFJDQpbFukLcYwG7rO5N5dQ6WWfVia/5XgQKBgQD74at/bXAPrh9NxPmz\n'\
  49. 'i1oqCHMDoM9sz8xIMZLF9YVu3Jf8ux4xVpRSnNy5RU1gl7ZXbpdgeIQ4v04zy5aw\n'\
  50. '8T4tu9K3XnR3UXOy25AK0q+cnnxZg3kFQm+PhtOCKEFjPHrgo2MUfnj+EDddod7N\n'\
  51. 'JQr9q5rEFbqHupFPpWlqCa3QmQKBgQDldWUGokNaEpmgHDMnHxiibXV5LQhzf8Rq\n'\
  52. 'gJIQXb7R9EsTSXEvsDyqTBb7PHp2Ko7rZ5YQfyf8OogGGjGElnPoU/a+Jij1gVFv\n'\
  53. 'kZ064uXAAISBkwHdcuobqc5EbG3ceyH46F+FBFhqM8KcbxJxx08objmh58+83InN\n'\
  54. 'P9Qr25Xw+QKBgEGXMHuMWgQbSZeM1aFFhoMvlBO7yogBTKb4Ecpu9wI5e3Kan3Al\n'\
  55. 'pZYltuyf+VhP6XG3IMBEYdoNJyYhu+nzyEdMg8CwXg+8LC7FMis/Ve+o7aS5scgG\n'\
  56. '1to/N9DK/swCsdTRdzmc/ZDbVC+TuVsebFBGYZTyO5KgqLpezqaIQrTxAoGALFCU\n'\
  57. '10glO9MVyl9H3clap5v+MQ3qcOv/EhaMnw6L2N6WVT481tnxjW4ujgzrFcE4YuxZ\n'\
  58. 'hgwYu9TOCmeqopGwBvGYWLbj+C4mfSahOAs0FfXDoYazuIIGBpuv03UhbpB1Si4O\n'\
  59. 'rJDfRnuCnVWyOTkl54gKJ2OusinhjztBjcrV1XkCgYEA3qNi4uBsPdyz9BZGb/3G\n'\
  60. 'rOMSw0CaT4pEMTLZqURmDP/0hxvTk1polP7O/FYwxVuJnBb6mzDa0xpLFPTpIAnJ\n'\
  61. 'YXB8xpXU69QVh+EBbemdJWOd+zp5UCfXvb2shAeG3Tn/Dz4cBBMEUutbzP+or0nG\n'\
  62. 'vSXnRLaxQhooWm+IuX9SuBQ=\n'\
  63. '-----END PRIVATE KEY-----\n'
  64. def create_file(server_file: str, file_data: str) -> None:
  65. with open(server_file, 'w+') as file:
  66. file.write(file_data)
  67. def get_ca_cert(ota_image_dir: str) -> Tuple[str, str]:
  68. os.chdir(ota_image_dir)
  69. server_file = os.path.join(ota_image_dir, 'server_cert.pem')
  70. create_file(server_file, server_cert)
  71. key_file = os.path.join(ota_image_dir, 'server_key.pem')
  72. create_file(key_file, server_key)
  73. return server_file, key_file
  74. def https_request_handler() -> Callable[...,http.server.BaseHTTPRequestHandler]:
  75. """
  76. Returns a request handler class that handles broken pipe exception
  77. """
  78. class RequestHandler(http.server.SimpleHTTPRequestHandler):
  79. def finish(self) -> None:
  80. try:
  81. if not self.wfile.closed:
  82. self.wfile.flush()
  83. self.wfile.close()
  84. except socket.error:
  85. pass
  86. self.rfile.close()
  87. def handle(self) -> None:
  88. try:
  89. http.server.BaseHTTPRequestHandler.handle(self)
  90. except socket.error:
  91. pass
  92. return RequestHandler
  93. def start_https_server(ota_image_dir: str, server_ip: str, server_port: int) -> None:
  94. server_file, key_file = get_ca_cert(ota_image_dir)
  95. requestHandler = https_request_handler()
  96. httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
  97. httpd.socket = ssl.wrap_socket(httpd.socket,
  98. keyfile=key_file,
  99. certfile=server_file, server_side=True)
  100. httpd.serve_forever()
  101. def start_chunked_server(ota_image_dir: str, server_port: int) -> subprocess.Popen:
  102. server_file, key_file = get_ca_cert(ota_image_dir)
  103. chunked_server = subprocess.Popen(['openssl', 's_server', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
  104. return chunked_server
  105. @pytest.mark.supported_targets
  106. @pytest.mark.ethernet_ota
  107. def test_examples_protocol_native_ota_example(dut: Dut) -> None:
  108. """
  109. This is a positive test case, which downloads complete binary file multiple number of times.
  110. Number of iterations can be specified in variable iterations.
  111. steps: |
  112. 1. join AP/Ethernet
  113. 2. Fetch OTA image over HTTPS
  114. 3. Reboot with the new OTA image
  115. """
  116. server_port = 8002
  117. # No. of times working of application to be validated
  118. iterations = 3
  119. # File to be downloaded. This file is generated after compilation
  120. bin_name = 'native_ota.bin'
  121. # Start server
  122. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  123. thread1.daemon = True
  124. thread1.start()
  125. try:
  126. # start test
  127. for _ in range(iterations):
  128. dut.expect('Loaded app from partition at offset', timeout=30)
  129. try:
  130. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  131. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  132. except pexpect.exceptions.TIMEOUT:
  133. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  134. host_ip = get_host_ip4_by_dest_ip(ip_address)
  135. dut.expect('Starting OTA example task', timeout=30)
  136. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  137. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  138. dut.expect('Prepare to restart system!', timeout=60)
  139. finally:
  140. thread1.terminate()
  141. @pytest.mark.supported_targets
  142. @pytest.mark.ethernet_ota
  143. def test_examples_protocol_native_ota_example_truncated_bin(dut: Dut) -> None:
  144. """
  145. Working of OTA if binary file is truncated is validated in this test case.
  146. Application should return with error message in this case.
  147. steps: |
  148. 1. join AP/Ethernet
  149. 2. Generate truncated binary file
  150. 3. Fetch OTA image over HTTPS
  151. 4. Check working of code if bin is truncated
  152. """
  153. server_port = 8002
  154. # Original binary file generated after compilation
  155. bin_name = 'native_ota.bin'
  156. # Truncated binary file to be generated from original binary file
  157. truncated_bin_name = 'truncated.bin'
  158. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  159. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  160. truncated_bin_size = 64000
  161. # check and log bin size
  162. binary_file = os.path.join(dut.app.binary_path, bin_name)
  163. with open(binary_file, 'rb+') as fr:
  164. bin_data = fr.read(truncated_bin_size)
  165. binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
  166. with open(binary_file, 'wb+') as fo:
  167. fo.write(bin_data)
  168. # Start server
  169. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  170. thread1.daemon = True
  171. thread1.start()
  172. try:
  173. # start test
  174. dut.expect('Loaded app from partition at offset', timeout=30)
  175. try:
  176. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  177. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  178. except pexpect.exceptions.TIMEOUT:
  179. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  180. host_ip = get_host_ip4_by_dest_ip(ip_address)
  181. dut.expect('Starting OTA example task', timeout=30)
  182. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  183. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  184. dut.expect('native_ota_example: Image validation failed, image is corrupted', timeout=20)
  185. os.remove(binary_file)
  186. finally:
  187. thread1.terminate()
  188. @pytest.mark.supported_targets
  189. @pytest.mark.ethernet_ota
  190. def test_examples_protocol_native_ota_example_truncated_header(dut: Dut) -> None:
  191. """
  192. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  193. Application should return with error message in this case.
  194. steps: |
  195. 1. join AP/Ethernet
  196. 2. Generate binary file with truncated headers
  197. 3. Fetch OTA image over HTTPS
  198. 4. Check working of code if headers are not sent completely
  199. """
  200. server_port = 8002
  201. # Original binary file generated after compilation
  202. bin_name = 'native_ota.bin'
  203. # Truncated binary file to be generated from original binary file
  204. truncated_bin_name = 'truncated_header.bin'
  205. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  206. truncated_bin_size = 180
  207. # check and log bin size
  208. binary_file = os.path.join(dut.app.binary_path, bin_name)
  209. with open(binary_file, 'rb+') as fr:
  210. bin_data = fr.read(truncated_bin_size)
  211. binary_file = os.path.join(dut.app.binary_path, truncated_bin_name)
  212. with open(binary_file, 'wb+') as fo:
  213. fo.write(bin_data)
  214. # Start server
  215. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  216. thread1.daemon = True
  217. thread1.start()
  218. try:
  219. # start test
  220. dut.expect('Loaded app from partition at offset', timeout=30)
  221. try:
  222. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  223. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  224. except pexpect.exceptions.TIMEOUT:
  225. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  226. host_ip = get_host_ip4_by_dest_ip(ip_address)
  227. dut.expect('Starting OTA example task', timeout=30)
  228. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  229. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  230. dut.expect('native_ota_example: received package is not fit len', timeout=20)
  231. os.remove(binary_file)
  232. finally:
  233. thread1.terminate()
  234. @pytest.mark.supported_targets
  235. @pytest.mark.ethernet_ota
  236. def test_examples_protocol_native_ota_example_random(dut: Dut) -> None:
  237. """
  238. Working of OTA if random data is added in binary file are validated in this test case.
  239. Magic byte verification should fail in this case.
  240. steps: |
  241. 1. join AP/Ethernet
  242. 2. Generate random binary image
  243. 3. Fetch OTA image over HTTPS
  244. 4. Check working of code for random binary file
  245. """
  246. server_port = 8002
  247. # Random binary file to be generated
  248. random_bin_name = 'random.bin'
  249. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  250. random_bin_size = 32000
  251. # check and log bin size
  252. binary_file = os.path.join(dut.app.binary_path, random_bin_name)
  253. fo = open(binary_file, 'wb+')
  254. # First byte of binary file is always set to zero. If first byte is generated randomly,
  255. # in some cases it may generate 0xE9 which will result in failure of testcase.
  256. with open(binary_file, 'wb+') as fo:
  257. fo.write(struct.pack('B', 0))
  258. for _ in range(random_bin_size - 1):
  259. fo.write(struct.pack('B', random.randrange(0,255,1)))
  260. # Start server
  261. thread1 = multiprocessing.Process(target=start_https_server, args=(dut.app.binary_path, '0.0.0.0', server_port))
  262. thread1.daemon = True
  263. thread1.start()
  264. try:
  265. # start test
  266. dut.expect('Loaded app from partition at offset', timeout=30)
  267. try:
  268. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  269. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  270. except pexpect.exceptions.TIMEOUT:
  271. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  272. host_ip = get_host_ip4_by_dest_ip(ip_address)
  273. dut.expect('Starting OTA example task', timeout=30)
  274. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
  275. dut.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
  276. dut.expect('esp_ota_ops: OTA image has invalid magic byte', timeout=20)
  277. os.remove(binary_file)
  278. finally:
  279. thread1.terminate()
  280. @pytest.mark.supported_targets
  281. @pytest.mark.ethernet_ota
  282. def test_examples_protocol_native_ota_example_chunked(dut: Dut) -> None:
  283. """
  284. This is a positive test case, which downloads complete binary file multiple number of times.
  285. Number of iterations can be specified in variable iterations.
  286. steps: |
  287. 1. join AP/Ethernet
  288. 2. Fetch OTA image over HTTPS
  289. 3. Reboot with the new OTA image
  290. """
  291. # File to be downloaded. This file is generated after compilation
  292. bin_name = 'native_ota.bin'
  293. # Start server
  294. chunked_server = start_chunked_server(dut.app.binary_path, 8070)
  295. try:
  296. # start test
  297. dut.expect('Loaded app from partition at offset', timeout=30)
  298. try:
  299. ip_address = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]', timeout=30)[1].decode()
  300. print('Connected to AP/Ethernet with IP: {}'.format(ip_address))
  301. except pexpect.exceptions.TIMEOUT:
  302. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP/Ethernet')
  303. host_ip = get_host_ip4_by_dest_ip(ip_address)
  304. dut.expect('Starting OTA example task', timeout=30)
  305. print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
  306. dut.write('https://' + host_ip + ':8070/' + bin_name)
  307. dut.expect('Prepare to restart system!', timeout=60)
  308. # after reboot
  309. dut.expect('Loaded app from partition at offset', timeout=30)
  310. dut.expect('OTA example app_main start', timeout=10)
  311. os.remove(os.path.join(dut.app.binary_path, 'server_cert.pem'))
  312. os.remove(os.path.join(dut.app.binary_path, 'server_key.pem'))
  313. finally:
  314. chunked_server.kill()