example_test.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. import http.server
  2. import multiprocessing
  3. import os
  4. import random
  5. import re
  6. import socket
  7. import ssl
  8. import struct
  9. import subprocess
  10. import ttfw_idf
  11. from RangeHTTPServer import RangeRequestHandler
  12. from tiny_test_fw import DUT, Utility
  13. server_cert = '-----BEGIN CERTIFICATE-----\n' \
  14. 'MIIDWDCCAkACCQCbF4+gVh/MLjANBgkqhkiG9w0BAQsFADBuMQswCQYDVQQGEwJJ\n'\
  15. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  16. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  17. 'b20wHhcNMjEwNzEyMTIzNjI3WhcNNDEwNzA3MTIzNjI3WjBuMQswCQYDVQQGEwJJ\n'\
  18. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  19. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  20. 'b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDhxF/y7bygndxPwiWL\n'\
  21. 'SwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQuc32W\n'\
  22. 'ukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2mKRbQ\n'\
  23. 'S5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO2fEz\n'\
  24. 'YaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnvL6Oz\n'\
  25. '3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdOAoap\n'\
  26. 'rFTRAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAItw24y565k3C/zENZlxyzto44ud\n'\
  27. 'IYPQXN8Fa2pBlLe1zlSIyuaA/rWQ+i1daS8nPotkCbWZyf5N8DYaTE4B0OfvoUPk\n'\
  28. 'B5uGDmbuk6akvlB5BGiYLfQjWHRsK9/4xjtIqN1H58yf3QNROuKsPAeywWS3Fn32\n'\
  29. '3//OpbWaClQePx6udRYMqAitKR+QxL7/BKZQsX+UyShuq8hjphvXvk0BW8ONzuw9\n'\
  30. 'RcoORxM0FzySYjeQvm4LhzC/P3ZBhEq0xs55aL2a76SJhq5hJy7T/Xz6NFByvlrN\n'\
  31. 'lFJJey33KFrAf5vnV9qcyWFIo7PYy2VsaaEjFeefr7q3sTFSMlJeadexW2Y=\n'\
  32. '-----END CERTIFICATE-----\n'
  33. server_key = '-----BEGIN PRIVATE KEY-----\n'\
  34. 'MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDhxF/y7bygndxP\n'\
  35. 'wiWLSwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQu\n'\
  36. 'c32WukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2m\n'\
  37. 'KRbQS5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO\n'\
  38. '2fEzYaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnv\n'\
  39. 'L6Oz3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdO\n'\
  40. 'AoaprFTRAgMBAAECggEAE0HCxV/N1Q1h+1OeDDGL5+74yjKSFKyb/vTVcaPCrmaH\n'\
  41. 'fPvp0ddOvMZJ4FDMAsiQS6/n4gQ7EKKEnYmwTqj4eUYW8yxGUn3f0YbPHbZT+Mkj\n'\
  42. 'z5woi3nMKi/MxCGDQZX4Ow3xUQlITUqibsfWcFHis8c4mTqdh4qj7xJzehD2PVYF\n'\
  43. 'gNHZsvVj6MltjBDAVwV1IlGoHjuElm6vuzkfX7phxcA1B4ZqdYY17yCXUnvui46z\n'\
  44. 'Xn2kUTOOUCEgfgvGa9E+l4OtdXi5IxjaSraU+dlg2KsE4TpCuN2MEVkeR5Ms3Y7Q\n'\
  45. 'jgJl8vlNFJDQpbFukLcYwG7rO5N5dQ6WWfVia/5XgQKBgQD74at/bXAPrh9NxPmz\n'\
  46. 'i1oqCHMDoM9sz8xIMZLF9YVu3Jf8ux4xVpRSnNy5RU1gl7ZXbpdgeIQ4v04zy5aw\n'\
  47. '8T4tu9K3XnR3UXOy25AK0q+cnnxZg3kFQm+PhtOCKEFjPHrgo2MUfnj+EDddod7N\n'\
  48. 'JQr9q5rEFbqHupFPpWlqCa3QmQKBgQDldWUGokNaEpmgHDMnHxiibXV5LQhzf8Rq\n'\
  49. 'gJIQXb7R9EsTSXEvsDyqTBb7PHp2Ko7rZ5YQfyf8OogGGjGElnPoU/a+Jij1gVFv\n'\
  50. 'kZ064uXAAISBkwHdcuobqc5EbG3ceyH46F+FBFhqM8KcbxJxx08objmh58+83InN\n'\
  51. 'P9Qr25Xw+QKBgEGXMHuMWgQbSZeM1aFFhoMvlBO7yogBTKb4Ecpu9wI5e3Kan3Al\n'\
  52. 'pZYltuyf+VhP6XG3IMBEYdoNJyYhu+nzyEdMg8CwXg+8LC7FMis/Ve+o7aS5scgG\n'\
  53. '1to/N9DK/swCsdTRdzmc/ZDbVC+TuVsebFBGYZTyO5KgqLpezqaIQrTxAoGALFCU\n'\
  54. '10glO9MVyl9H3clap5v+MQ3qcOv/EhaMnw6L2N6WVT481tnxjW4ujgzrFcE4YuxZ\n'\
  55. 'hgwYu9TOCmeqopGwBvGYWLbj+C4mfSahOAs0FfXDoYazuIIGBpuv03UhbpB1Si4O\n'\
  56. 'rJDfRnuCnVWyOTkl54gKJ2OusinhjztBjcrV1XkCgYEA3qNi4uBsPdyz9BZGb/3G\n'\
  57. 'rOMSw0CaT4pEMTLZqURmDP/0hxvTk1polP7O/FYwxVuJnBb6mzDa0xpLFPTpIAnJ\n'\
  58. 'YXB8xpXU69QVh+EBbemdJWOd+zp5UCfXvb2shAeG3Tn/Dz4cBBMEUutbzP+or0nG\n'\
  59. 'vSXnRLaxQhooWm+IuX9SuBQ=\n'\
  60. '-----END PRIVATE KEY-----\n'
  61. def get_my_ip():
  62. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  63. s1.connect(('8.8.8.8', 80))
  64. my_ip = s1.getsockname()[0]
  65. s1.close()
  66. return my_ip
  67. def get_server_status(host_ip, port):
  68. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  69. server_status = sock.connect_ex((host_ip, port))
  70. sock.close()
  71. if server_status == 0:
  72. return True
  73. return False
  74. def create_file(server_file, file_data):
  75. with open(server_file, 'w+') as file:
  76. file.write(file_data)
  77. def get_ca_cert(ota_image_dir):
  78. os.chdir(ota_image_dir)
  79. server_file = os.path.join(ota_image_dir, 'server_cert.pem')
  80. create_file(server_file, server_cert)
  81. key_file = os.path.join(ota_image_dir, 'server_key.pem')
  82. create_file(key_file, server_key)
  83. return server_file, key_file
  84. def https_request_handler():
  85. """
  86. Returns a request handler class that handles broken pipe exception
  87. """
  88. class RequestHandler(RangeRequestHandler):
  89. def finish(self):
  90. try:
  91. if not self.wfile.closed:
  92. self.wfile.flush()
  93. self.wfile.close()
  94. except socket.error:
  95. pass
  96. self.rfile.close()
  97. def handle(self):
  98. try:
  99. RangeRequestHandler.handle(self)
  100. except socket.error:
  101. pass
  102. return RequestHandler
  103. def start_https_server(ota_image_dir, server_ip, server_port):
  104. server_file, key_file = get_ca_cert(ota_image_dir)
  105. requestHandler = https_request_handler()
  106. httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
  107. httpd.socket = ssl.wrap_socket(httpd.socket,
  108. keyfile=key_file,
  109. certfile=server_file, server_side=True)
  110. httpd.serve_forever()
  111. def start_chunked_server(ota_image_dir, server_port):
  112. server_file, key_file = get_ca_cert(ota_image_dir)
  113. chunked_server = subprocess.Popen(['openssl', 's_server', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
  114. return chunked_server
  115. def redirect_handler_factory(url):
  116. """
  117. Returns a request handler class that redirects to supplied `url`
  118. """
  119. class RedirectHandler(http.server.SimpleHTTPRequestHandler):
  120. def do_GET(self):
  121. print('Sending resp, URL: ' + url)
  122. self.send_response(301)
  123. self.send_header('Location', url)
  124. self.end_headers()
  125. def handle(self):
  126. try:
  127. http.server.BaseHTTPRequestHandler.handle(self)
  128. except socket.error:
  129. pass
  130. return RedirectHandler
  131. def start_redirect_server(ota_image_dir, server_ip, server_port, redirection_port):
  132. os.chdir(ota_image_dir)
  133. server_file, key_file = get_ca_cert(ota_image_dir)
  134. redirectHandler = redirect_handler_factory('https://' + server_ip + ':' + str(redirection_port) + '/advanced_https_ota.bin')
  135. httpd = http.server.HTTPServer((server_ip, server_port), redirectHandler)
  136. httpd.socket = ssl.wrap_socket(httpd.socket,
  137. keyfile=key_file,
  138. certfile=server_file, server_side=True)
  139. httpd.serve_forever()
  140. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  141. def test_examples_protocol_advanced_https_ota_example(env, extra_data):
  142. """
  143. This is a positive test case, which downloads complete binary file multiple number of times.
  144. Number of iterations can be specified in variable iterations.
  145. steps: |
  146. 1. join AP
  147. 2. Fetch OTA image over HTTPS
  148. 3. Reboot with the new OTA image
  149. """
  150. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  151. # Number of iterations to validate OTA
  152. iterations = 3
  153. server_port = 8001
  154. # File to be downloaded. This file is generated after compilation
  155. bin_name = 'advanced_https_ota.bin'
  156. # check and log bin size
  157. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  158. bin_size = os.path.getsize(binary_file)
  159. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  160. # start test
  161. host_ip = get_my_ip()
  162. if (get_server_status(host_ip, server_port) is False):
  163. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  164. thread1.daemon = True
  165. thread1.start()
  166. dut1.start_app()
  167. for i in range(iterations):
  168. dut1.expect('Loaded app from partition at offset', timeout=30)
  169. try:
  170. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  171. print('Connected to AP with IP: {}'.format(ip_address))
  172. except DUT.ExpectTimeout:
  173. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  174. thread1.terminate()
  175. dut1.expect('Starting Advanced OTA example', timeout=30)
  176. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  177. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  178. dut1.expect('Loaded app from partition at offset', timeout=60)
  179. dut1.expect('Starting Advanced OTA example', timeout=30)
  180. dut1.reset()
  181. thread1.terminate()
  182. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  183. def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_data):
  184. """
  185. Working of OTA if binary file is truncated is validated in this test case.
  186. Application should return with error message in this case.
  187. steps: |
  188. 1. join AP
  189. 2. Generate truncated binary file
  190. 3. Fetch OTA image over HTTPS
  191. 4. Check working of code if bin is truncated
  192. """
  193. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  194. server_port = 8001
  195. # Original binary file generated after compilation
  196. bin_name = 'advanced_https_ota.bin'
  197. # Truncated binary file to be generated from original binary file
  198. truncated_bin_name = 'truncated.bin'
  199. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  200. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  201. truncated_bin_size = 64000
  202. # check and log bin size
  203. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  204. f = open(binary_file, 'rb+')
  205. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
  206. fo.write(f.read(truncated_bin_size))
  207. fo.close()
  208. f.close()
  209. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  210. bin_size = os.path.getsize(binary_file)
  211. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  212. # start test
  213. host_ip = get_my_ip()
  214. if (get_server_status(host_ip, server_port) is False):
  215. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  216. thread1.daemon = True
  217. thread1.start()
  218. dut1.start_app()
  219. dut1.expect('Loaded app from partition at offset', timeout=30)
  220. try:
  221. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  222. print('Connected to AP with IP: {}'.format(ip_address))
  223. except DUT.ExpectTimeout:
  224. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  225. thread1.terminate()
  226. dut1.expect('Starting Advanced OTA example', timeout=30)
  227. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  228. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  229. dut1.expect('Image validation failed, image is corrupted', timeout=30)
  230. os.remove(binary_file)
  231. thread1.terminate()
  232. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  233. def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extra_data):
  234. """
  235. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  236. Application should return with error message in this case.
  237. steps: |
  238. 1. join AP
  239. 2. Generate binary file with truncated headers
  240. 3. Fetch OTA image over HTTPS
  241. 4. Check working of code if headers are not sent completely
  242. """
  243. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  244. server_port = 8001
  245. # Original binary file generated after compilation
  246. bin_name = 'advanced_https_ota.bin'
  247. # Truncated binary file to be generated from original binary file
  248. truncated_bin_name = 'truncated_header.bin'
  249. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  250. truncated_bin_size = 180
  251. # check and log bin size
  252. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  253. f = open(binary_file, 'rb+')
  254. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
  255. fo.write(f.read(truncated_bin_size))
  256. fo.close()
  257. f.close()
  258. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  259. bin_size = os.path.getsize(binary_file)
  260. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  261. # start test
  262. host_ip = get_my_ip()
  263. if (get_server_status(host_ip, server_port) is False):
  264. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  265. thread1.daemon = True
  266. thread1.start()
  267. dut1.start_app()
  268. dut1.expect('Loaded app from partition at offset', timeout=30)
  269. try:
  270. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  271. print('Connected to AP with IP: {}'.format(ip_address))
  272. except DUT.ExpectTimeout:
  273. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  274. thread1.terminate()
  275. dut1.expect('Starting Advanced OTA example', timeout=30)
  276. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  277. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  278. dut1.expect('advanced_https_ota_example: esp_https_ota_read_img_desc failed', timeout=30)
  279. os.remove(binary_file)
  280. thread1.terminate()
  281. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  282. def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
  283. """
  284. Working of OTA if random data is added in binary file are validated in this test case.
  285. Magic byte verification should fail in this case.
  286. steps: |
  287. 1. join AP
  288. 2. Generate random binary image
  289. 3. Fetch OTA image over HTTPS
  290. 4. Check working of code for random binary file
  291. """
  292. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  293. server_port = 8001
  294. # Random binary file to be generated
  295. random_bin_name = 'random.bin'
  296. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  297. random_bin_size = 32000
  298. # check and log bin size
  299. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  300. fo = open(binary_file, 'wb+')
  301. # First byte of binary file is always set to zero. If first byte is generated randomly,
  302. # in some cases it may generate 0xE9 which will result in failure of testcase.
  303. fo.write(struct.pack('B', 0))
  304. for i in range(random_bin_size - 1):
  305. fo.write(struct.pack('B', random.randrange(0,255,1)))
  306. fo.close()
  307. bin_size = os.path.getsize(binary_file)
  308. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  309. # start test
  310. host_ip = get_my_ip()
  311. if (get_server_status(host_ip, server_port) is False):
  312. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  313. thread1.daemon = True
  314. thread1.start()
  315. dut1.start_app()
  316. dut1.expect('Loaded app from partition at offset', timeout=30)
  317. try:
  318. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  319. print('Connected to AP with IP: {}'.format(ip_address))
  320. except DUT.ExpectTimeout:
  321. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  322. thread1.terminate()
  323. dut1.expect('Starting Advanced OTA example', timeout=30)
  324. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
  325. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
  326. dut1.expect('esp_ota_ops: OTA image has invalid magic byte', timeout=10)
  327. os.remove(binary_file)
  328. thread1.terminate()
  329. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  330. def test_examples_protocol_advanced_https_ota_example_chunked(env, extra_data):
  331. """
  332. This is a positive test case, which downloads complete binary file multiple number of times.
  333. Number of iterations can be specified in variable iterations.
  334. steps: |
  335. 1. join AP
  336. 2. Fetch OTA image over HTTPS
  337. 3. Reboot with the new OTA image
  338. """
  339. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  340. # File to be downloaded. This file is generated after compilation
  341. bin_name = 'advanced_https_ota.bin'
  342. # check and log bin size
  343. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  344. bin_size = os.path.getsize(binary_file)
  345. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  346. # start test
  347. host_ip = get_my_ip()
  348. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  349. dut1.start_app()
  350. dut1.expect('Loaded app from partition at offset', timeout=30)
  351. try:
  352. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  353. print('Connected to AP with IP: {}'.format(ip_address))
  354. except DUT.ExpectTimeout:
  355. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  356. dut1.expect('Starting Advanced OTA example', timeout=30)
  357. print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
  358. dut1.write('https://' + host_ip + ':8070/' + bin_name)
  359. dut1.expect('Loaded app from partition at offset', timeout=60)
  360. dut1.expect('Starting Advanced OTA example', timeout=30)
  361. chunked_server.kill()
  362. os.remove(os.path.join(dut1.app.binary_path, 'server_cert.pem'))
  363. os.remove(os.path.join(dut1.app.binary_path, 'server_key.pem'))
  364. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  365. def test_examples_protocol_advanced_https_ota_example_redirect_url(env, extra_data):
  366. """
  367. This is a positive test case, which starts a server and a redirection server.
  368. Redirection server redirects http_request to different port
  369. Number of iterations can be specified in variable iterations.
  370. steps: |
  371. 1. join AP
  372. 2. Fetch OTA image over HTTPS
  373. 3. Reboot with the new OTA image
  374. """
  375. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  376. server_port = 8001
  377. # Port to which the request should be redirecetd
  378. redirection_server_port = 8081
  379. # File to be downloaded. This file is generated after compilation
  380. bin_name = 'advanced_https_ota.bin'
  381. # check and log bin size
  382. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  383. bin_size = os.path.getsize(binary_file)
  384. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  385. # start test
  386. host_ip = get_my_ip()
  387. if (get_server_status(host_ip, server_port) is False):
  388. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  389. thread1.daemon = True
  390. thread1.start()
  391. thread2 = multiprocessing.Process(target=start_redirect_server, args=(dut1.app.binary_path, host_ip, redirection_server_port, server_port))
  392. thread2.daemon = True
  393. thread2.start()
  394. dut1.start_app()
  395. dut1.expect('Loaded app from partition at offset', timeout=30)
  396. try:
  397. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  398. print('Connected to AP with IP: {}'.format(ip_address))
  399. except DUT.ExpectTimeout:
  400. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  401. thread1.terminate()
  402. thread2.terminate()
  403. dut1.expect('Starting Advanced OTA example', timeout=30)
  404. print('writing to device: {}'.format('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name))
  405. dut1.write('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name)
  406. dut1.expect('Loaded app from partition at offset', timeout=60)
  407. dut1.expect('Starting Advanced OTA example', timeout=30)
  408. dut1.reset()
  409. thread1.terminate()
  410. thread2.terminate()
  411. @ttfw_idf.idf_example_test(env_tag='Example_8Mflash_Ethernet')
  412. def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_data):
  413. """
  414. Working of OTA when anti_rollback is enabled and security version of new image is less than current one.
  415. Application should return with error message in this case.
  416. steps: |
  417. 1. join AP
  418. 2. Generate binary file with lower security version
  419. 3. Fetch OTA image over HTTPS
  420. 4. Check working of anti_rollback feature
  421. """
  422. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='anti_rollback')
  423. Utility.console_log('Erasing the flash on the chip')
  424. # erase the flash
  425. dut1.erase_flash()
  426. server_port = 8001
  427. # Original binary file generated after compilation
  428. bin_name = 'advanced_https_ota.bin'
  429. # Modified firmware image to lower security version in its header. This is to enable negative test case
  430. anti_rollback_bin_name = 'advanced_https_ota_lower_sec_version.bin'
  431. # check and log bin size
  432. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  433. file_size = os.path.getsize(binary_file)
  434. f = open(binary_file, 'rb+')
  435. fo = open(os.path.join(dut1.app.binary_path, anti_rollback_bin_name), 'wb+')
  436. fo.write(f.read(file_size))
  437. # Change security_version to 0 for negative test case
  438. fo.seek(36)
  439. fo.write(b'\x00')
  440. fo.close()
  441. f.close()
  442. binary_file = os.path.join(dut1.app.binary_path, anti_rollback_bin_name)
  443. bin_size = os.path.getsize(binary_file)
  444. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  445. # start test
  446. host_ip = get_my_ip()
  447. if (get_server_status(host_ip, server_port) is False):
  448. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  449. thread1.daemon = True
  450. thread1.start()
  451. dut1.start_app()
  452. # Positive Case
  453. dut1.expect('Loaded app from partition at offset', timeout=30)
  454. try:
  455. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  456. print('Connected to AP with IP: {}'.format(ip_address))
  457. except DUT.ExpectTimeout:
  458. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  459. thread1.terminate()
  460. dut1.expect('Starting Advanced OTA example', timeout=30)
  461. # Use originally generated image with secure_version=1
  462. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  463. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  464. dut1.expect('Loaded app from partition at offset', timeout=60)
  465. dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  466. dut1.expect('App is valid, rollback cancelled successfully', 30)
  467. # Negative Case
  468. dut1.expect('Starting Advanced OTA example', timeout=30)
  469. # Use modified image with secure_version=0
  470. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name))
  471. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name)
  472. dut1.expect('New firmware security version is less than eFuse programmed, 0 < 1', timeout=30)
  473. os.remove(binary_file)
  474. thread1.terminate()
  475. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  476. def test_examples_protocol_advanced_https_ota_example_partial_request(env, extra_data):
  477. """
  478. This is a positive test case, to test OTA workflow with Range HTTP header.
  479. steps: |
  480. 1. join AP
  481. 2. Fetch OTA image over HTTPS
  482. 3. Reboot with the new OTA image
  483. """
  484. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='partial_download')
  485. server_port = 8001
  486. # Size of partial HTTP request
  487. request_size = 16384
  488. # File to be downloaded. This file is generated after compilation
  489. bin_name = 'advanced_https_ota.bin'
  490. # check and log bin size
  491. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  492. bin_size = os.path.getsize(binary_file)
  493. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  494. http_requests = int((bin_size / request_size) - 1)
  495. # start test
  496. host_ip = get_my_ip()
  497. if (get_server_status(host_ip, server_port) is False):
  498. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  499. thread1.daemon = True
  500. thread1.start()
  501. dut1.start_app()
  502. dut1.expect('Loaded app from partition at offset', timeout=30)
  503. try:
  504. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  505. print('Connected to AP with IP: {}'.format(ip_address))
  506. except DUT.ExpectTimeout:
  507. Utility.console_log('ENV_TEST_FAILURE: Cannot connect to AP')
  508. raise
  509. thread1.terminate()
  510. dut1.expect('Starting Advanced OTA example', timeout=30)
  511. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  512. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  513. for _ in range(http_requests):
  514. dut1.expect('Connection closed', timeout=60)
  515. dut1.expect('Loaded app from partition at offset', timeout=60)
  516. dut1.expect('Starting Advanced OTA example', timeout=30)
  517. dut1.reset()
  518. thread1.terminate()
  519. @ttfw_idf.idf_example_test(env_tag='Example_WIFI_OTA', nightly_run=True)
  520. def test_examples_protocol_advanced_https_ota_example_nimble_gatts(env, extra_data):
  521. """
  522. Run an OTA image update while a BLE GATT Server is running in background. This GATT server will be using NimBLE Host stack.
  523. steps: |
  524. 1. join AP
  525. 2. Run BLE advertise and then GATT server.
  526. 3. Fetch OTA image over HTTPS
  527. 4. Reboot with the new OTA image
  528. """
  529. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='nimble')
  530. server_port = 8001
  531. # File to be downloaded. This file is generated after compilation
  532. bin_name = 'advanced_https_ota.bin'
  533. # check and log bin size
  534. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  535. bin_size = os.path.getsize(binary_file)
  536. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  537. # start test
  538. host_ip = get_my_ip()
  539. if (get_server_status(host_ip, server_port) is False):
  540. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  541. thread1.daemon = True
  542. thread1.start()
  543. dut1.start_app()
  544. dut1.expect('Loaded app from partition at offset', timeout=30)
  545. try:
  546. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  547. print('Connected to AP with IP: {}'.format(ip_address))
  548. except DUT.ExpectTimeout:
  549. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  550. thread1.terminate()
  551. dut1.expect('Starting Advanced OTA example', timeout=30)
  552. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  553. dut1.expect('GAP procedure initiated: advertise', timeout=30)
  554. print('Started GAP advertising.')
  555. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  556. dut1.expect('Loaded app from partition at offset', timeout=60)
  557. dut1.expect('Starting Advanced OTA example', timeout=30)
  558. dut1.reset()
  559. thread1.terminate()
  560. @ttfw_idf.idf_example_test(env_tag='Example_WIFI_OTA', nightly_run=True)
  561. def test_examples_protocol_advanced_https_ota_example_bluedroid_gatts(env, extra_data):
  562. """
  563. Run an OTA image update while a BLE GATT Server is running in background. This GATT server will be using Bluedroid Host stack.
  564. steps: |
  565. 1. join AP
  566. 2. Run BLE advertise and then GATT server.
  567. 3. Fetch OTA image over HTTPS
  568. 4. Reboot with the new OTA image
  569. """
  570. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='bluedroid')
  571. server_port = 8001
  572. # File to be downloaded. This file is generated after compilation
  573. bin_name = 'advanced_https_ota.bin'
  574. # check and log bin size
  575. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  576. bin_size = os.path.getsize(binary_file)
  577. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  578. # start test
  579. host_ip = get_my_ip()
  580. if (get_server_status(host_ip, server_port) is False):
  581. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  582. thread1.daemon = True
  583. thread1.start()
  584. dut1.start_app()
  585. dut1.expect('Loaded app from partition at offset', timeout=30)
  586. try:
  587. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  588. print('Connected to AP with IP: {}'.format(ip_address))
  589. except DUT.ExpectTimeout:
  590. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  591. thread1.terminate()
  592. dut1.expect('Starting Advanced OTA example', timeout=30)
  593. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  594. dut1.expect('Started advertising.', timeout=30)
  595. print('Started GAP advertising.')
  596. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  597. dut1.expect('Loaded app from partition at offset', timeout=60)
  598. dut1.expect('Starting Advanced OTA example', timeout=30)
  599. dut1.reset()
  600. thread1.terminate()
  601. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  602. def test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin(env, extra_data):
  603. """
  604. This is a test case for esp_http_client_read with binary size multiple of 289 bytes
  605. steps: |
  606. 1. join AP
  607. 2. Fetch OTA image over HTTPS
  608. 3. Reboot with the new OTA image
  609. """
  610. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  611. # Original binary file generated after compilation
  612. bin_name = 'advanced_https_ota.bin'
  613. # Binary file aligned to DEFAULT_OTA_BUF_SIZE(289 bytes) boundary
  614. aligned_bin_name = 'aligned.bin'
  615. # check and log bin size
  616. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  617. # Original binary size
  618. bin_size = os.path.getsize(binary_file)
  619. # Dummy data required to align binary size to 289 bytes boundary
  620. dummy_data_size = 289 - (bin_size % 289)
  621. f = open(binary_file, 'rb+')
  622. fo = open(os.path.join(dut1.app.binary_path, aligned_bin_name), 'wb+')
  623. fo.write(f.read(bin_size))
  624. for _ in range(dummy_data_size):
  625. fo.write(struct.pack('B', random.randrange(0,255,1)))
  626. fo.close()
  627. f.close()
  628. # start test
  629. host_ip = get_my_ip()
  630. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  631. dut1.start_app()
  632. dut1.expect('Loaded app from partition at offset', timeout=30)
  633. try:
  634. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  635. print('Connected to AP with IP: {}'.format(ip_address))
  636. except DUT.ExpectTimeout:
  637. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  638. dut1.expect('Starting Advanced OTA example', timeout=30)
  639. print('writing to device: {}'.format('https://' + host_ip + ':8070/' + aligned_bin_name))
  640. dut1.write('https://' + host_ip + ':8070/' + aligned_bin_name)
  641. dut1.expect('Loaded app from partition at offset', timeout=60)
  642. dut1.expect('Starting Advanced OTA example', timeout=30)
  643. chunked_server.kill()
  644. os.remove(os.path.join(dut1.app.binary_path, 'server_cert.pem'))
  645. os.remove(os.path.join(dut1.app.binary_path, 'server_key.pem'))
  646. os.remove(aligned_bin_name)
  647. if __name__ == '__main__':
  648. test_examples_protocol_advanced_https_ota_example()
  649. test_examples_protocol_advanced_https_ota_example_chunked()
  650. test_examples_protocol_advanced_https_ota_example_redirect_url()
  651. test_examples_protocol_advanced_https_ota_example_truncated_bin()
  652. test_examples_protocol_advanced_https_ota_example_truncated_header()
  653. test_examples_protocol_advanced_https_ota_example_random()
  654. test_examples_protocol_advanced_https_ota_example_anti_rollback()
  655. test_examples_protocol_advanced_https_ota_example_partial_request()
  656. test_examples_protocol_advanced_https_ota_example_nimble_gatts()
  657. test_examples_protocol_advanced_https_ota_example_bluedroid_gatts()
  658. test_examples_protocol_advanced_https_ota_example_openssl_aligned_bin()