example_test.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. import http.server
  2. import os
  3. import random
  4. import re
  5. import socket
  6. import ssl
  7. import struct
  8. import subprocess
  9. from threading import Thread
  10. import ttfw_idf
  11. from tiny_test_fw import DUT
  12. server_cert = '-----BEGIN CERTIFICATE-----\n' \
  13. 'MIIDWDCCAkACCQCbF4+gVh/MLjANBgkqhkiG9w0BAQsFADBuMQswCQYDVQQGEwJJ\n'\
  14. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  15. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  16. 'b20wHhcNMjEwNzEyMTIzNjI3WhcNNDEwNzA3MTIzNjI3WjBuMQswCQYDVQQGEwJJ\n'\
  17. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  18. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  19. 'b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDhxF/y7bygndxPwiWL\n'\
  20. 'SwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQuc32W\n'\
  21. 'ukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2mKRbQ\n'\
  22. 'S5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO2fEz\n'\
  23. 'YaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnvL6Oz\n'\
  24. '3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdOAoap\n'\
  25. 'rFTRAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAItw24y565k3C/zENZlxyzto44ud\n'\
  26. 'IYPQXN8Fa2pBlLe1zlSIyuaA/rWQ+i1daS8nPotkCbWZyf5N8DYaTE4B0OfvoUPk\n'\
  27. 'B5uGDmbuk6akvlB5BGiYLfQjWHRsK9/4xjtIqN1H58yf3QNROuKsPAeywWS3Fn32\n'\
  28. '3//OpbWaClQePx6udRYMqAitKR+QxL7/BKZQsX+UyShuq8hjphvXvk0BW8ONzuw9\n'\
  29. 'RcoORxM0FzySYjeQvm4LhzC/P3ZBhEq0xs55aL2a76SJhq5hJy7T/Xz6NFByvlrN\n'\
  30. 'lFJJey33KFrAf5vnV9qcyWFIo7PYy2VsaaEjFeefr7q3sTFSMlJeadexW2Y=\n'\
  31. '-----END CERTIFICATE-----\n'
  32. server_key = '-----BEGIN PRIVATE KEY-----\n'\
  33. 'MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDhxF/y7bygndxP\n'\
  34. 'wiWLSwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQu\n'\
  35. 'c32WukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2m\n'\
  36. 'KRbQS5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO\n'\
  37. '2fEzYaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnv\n'\
  38. 'L6Oz3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdO\n'\
  39. 'AoaprFTRAgMBAAECggEAE0HCxV/N1Q1h+1OeDDGL5+74yjKSFKyb/vTVcaPCrmaH\n'\
  40. 'fPvp0ddOvMZJ4FDMAsiQS6/n4gQ7EKKEnYmwTqj4eUYW8yxGUn3f0YbPHbZT+Mkj\n'\
  41. 'z5woi3nMKi/MxCGDQZX4Ow3xUQlITUqibsfWcFHis8c4mTqdh4qj7xJzehD2PVYF\n'\
  42. 'gNHZsvVj6MltjBDAVwV1IlGoHjuElm6vuzkfX7phxcA1B4ZqdYY17yCXUnvui46z\n'\
  43. 'Xn2kUTOOUCEgfgvGa9E+l4OtdXi5IxjaSraU+dlg2KsE4TpCuN2MEVkeR5Ms3Y7Q\n'\
  44. 'jgJl8vlNFJDQpbFukLcYwG7rO5N5dQ6WWfVia/5XgQKBgQD74at/bXAPrh9NxPmz\n'\
  45. 'i1oqCHMDoM9sz8xIMZLF9YVu3Jf8ux4xVpRSnNy5RU1gl7ZXbpdgeIQ4v04zy5aw\n'\
  46. '8T4tu9K3XnR3UXOy25AK0q+cnnxZg3kFQm+PhtOCKEFjPHrgo2MUfnj+EDddod7N\n'\
  47. 'JQr9q5rEFbqHupFPpWlqCa3QmQKBgQDldWUGokNaEpmgHDMnHxiibXV5LQhzf8Rq\n'\
  48. 'gJIQXb7R9EsTSXEvsDyqTBb7PHp2Ko7rZ5YQfyf8OogGGjGElnPoU/a+Jij1gVFv\n'\
  49. 'kZ064uXAAISBkwHdcuobqc5EbG3ceyH46F+FBFhqM8KcbxJxx08objmh58+83InN\n'\
  50. 'P9Qr25Xw+QKBgEGXMHuMWgQbSZeM1aFFhoMvlBO7yogBTKb4Ecpu9wI5e3Kan3Al\n'\
  51. 'pZYltuyf+VhP6XG3IMBEYdoNJyYhu+nzyEdMg8CwXg+8LC7FMis/Ve+o7aS5scgG\n'\
  52. '1to/N9DK/swCsdTRdzmc/ZDbVC+TuVsebFBGYZTyO5KgqLpezqaIQrTxAoGALFCU\n'\
  53. '10glO9MVyl9H3clap5v+MQ3qcOv/EhaMnw6L2N6WVT481tnxjW4ujgzrFcE4YuxZ\n'\
  54. 'hgwYu9TOCmeqopGwBvGYWLbj+C4mfSahOAs0FfXDoYazuIIGBpuv03UhbpB1Si4O\n'\
  55. 'rJDfRnuCnVWyOTkl54gKJ2OusinhjztBjcrV1XkCgYEA3qNi4uBsPdyz9BZGb/3G\n'\
  56. 'rOMSw0CaT4pEMTLZqURmDP/0hxvTk1polP7O/FYwxVuJnBb6mzDa0xpLFPTpIAnJ\n'\
  57. 'YXB8xpXU69QVh+EBbemdJWOd+zp5UCfXvb2shAeG3Tn/Dz4cBBMEUutbzP+or0nG\n'\
  58. 'vSXnRLaxQhooWm+IuX9SuBQ=\n'\
  59. '-----END PRIVATE KEY-----\n'
  60. def get_my_ip():
  61. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  62. s1.connect(('8.8.8.8', 80))
  63. my_ip = s1.getsockname()[0]
  64. s1.close()
  65. return my_ip
  66. def get_server_status(host_ip, port):
  67. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  68. server_status = sock.connect_ex((host_ip, port))
  69. sock.close()
  70. if server_status == 0:
  71. return True
  72. return False
  73. def create_file(server_file, file_data):
  74. with open(server_file, 'w+') as file:
  75. file.write(file_data)
  76. def get_ca_cert(ota_image_dir):
  77. os.chdir(ota_image_dir)
  78. server_file = os.path.join(ota_image_dir, 'server_cert.pem')
  79. create_file(server_file, server_cert)
  80. key_file = os.path.join(ota_image_dir, 'server_key.pem')
  81. create_file(key_file, server_key)
  82. return server_file, key_file
  83. def https_request_handler():
  84. """
  85. Returns a request handler class that handles broken pipe exception
  86. """
  87. class RequestHandler(http.server.SimpleHTTPRequestHandler):
  88. def finish(self):
  89. try:
  90. if not self.wfile.closed:
  91. self.wfile.flush()
  92. self.wfile.close()
  93. except socket.error:
  94. pass
  95. self.rfile.close()
  96. def handle(self):
  97. try:
  98. http.server.BaseHTTPRequestHandler.handle(self)
  99. except socket.error:
  100. pass
  101. return RequestHandler
  102. def start_https_server(ota_image_dir, server_ip, server_port):
  103. server_file, key_file = get_ca_cert(ota_image_dir)
  104. requestHandler = https_request_handler()
  105. httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
  106. httpd.socket = ssl.wrap_socket(httpd.socket,
  107. keyfile=key_file,
  108. certfile=server_file, server_side=True)
  109. httpd.serve_forever()
  110. def start_chunked_server(ota_image_dir, server_port):
  111. server_file, key_file = get_ca_cert(ota_image_dir)
  112. chunked_server = subprocess.Popen(['openssl', 's_server', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
  113. return chunked_server
  114. def redirect_handler_factory(url):
  115. """
  116. Returns a request handler class that redirects to supplied `url`
  117. """
  118. class RedirectHandler(http.server.SimpleHTTPRequestHandler):
  119. def do_GET(self):
  120. print('Sending resp, URL: ' + url)
  121. self.send_response(301)
  122. self.send_header('Location', url)
  123. self.end_headers()
  124. def handle(self):
  125. try:
  126. http.server.BaseHTTPRequestHandler.handle(self)
  127. except socket.error:
  128. pass
  129. return RedirectHandler
  130. def start_redirect_server(ota_image_dir, server_ip, server_port, redirection_port):
  131. os.chdir(ota_image_dir)
  132. server_file, key_file = get_ca_cert(ota_image_dir)
  133. redirectHandler = redirect_handler_factory('https://' + server_ip + ':' + str(redirection_port) + '/advanced_https_ota.bin')
  134. httpd = http.server.HTTPServer((server_ip, server_port), redirectHandler)
  135. httpd.socket = ssl.wrap_socket(httpd.socket,
  136. keyfile=key_file,
  137. certfile=server_file, server_side=True)
  138. httpd.serve_forever()
  139. @ttfw_idf.idf_example_test(env_tag='Example_WIFI')
  140. def test_examples_protocol_advanced_https_ota_example(env, extra_data):
  141. """
  142. This is a positive test case, which downloads complete binary file multiple number of times.
  143. Number of iterations can be specified in variable iterations.
  144. steps: |
  145. 1. join AP
  146. 2. Fetch OTA image over HTTPS
  147. 3. Reboot with the new OTA image
  148. """
  149. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  150. # Number of iterations to validate OTA
  151. iterations = 3
  152. server_port = 8001
  153. # File to be downloaded. This file is generated after compilation
  154. bin_name = 'advanced_https_ota.bin'
  155. # check and log bin size
  156. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  157. bin_size = os.path.getsize(binary_file)
  158. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  159. # start test
  160. host_ip = get_my_ip()
  161. if (get_server_status(host_ip, server_port) is False):
  162. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  163. thread1.daemon = True
  164. thread1.start()
  165. dut1.start_app()
  166. for i in range(iterations):
  167. dut1.expect('Loaded app from partition at offset', timeout=30)
  168. try:
  169. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  170. print('Connected to AP with IP: {}'.format(ip_address))
  171. except DUT.ExpectTimeout:
  172. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  173. thread1.close()
  174. dut1.expect('Starting Advanced OTA example', timeout=30)
  175. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  176. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  177. dut1.expect('Loaded app from partition at offset', timeout=60)
  178. dut1.expect('Starting Advanced OTA example', timeout=30)
  179. dut1.reset()
  180. @ttfw_idf.idf_example_test(env_tag='Example_WIFI')
  181. def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_data):
  182. """
  183. Working of OTA if binary file is truncated is validated in this test case.
  184. Application should return with error message in this case.
  185. steps: |
  186. 1. join AP
  187. 2. Generate truncated binary file
  188. 3. Fetch OTA image over HTTPS
  189. 4. Check working of code if bin is truncated
  190. """
  191. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  192. server_port = 8001
  193. # Original binary file generated after compilation
  194. bin_name = 'advanced_https_ota.bin'
  195. # Truncated binary file to be generated from original binary file
  196. truncated_bin_name = 'truncated.bin'
  197. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  198. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  199. truncated_bin_size = 64000
  200. # check and log bin size
  201. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  202. f = open(binary_file, 'rb+')
  203. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
  204. fo.write(f.read(truncated_bin_size))
  205. fo.close()
  206. f.close()
  207. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  208. bin_size = os.path.getsize(binary_file)
  209. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  210. # start test
  211. host_ip = get_my_ip()
  212. if (get_server_status(host_ip, server_port) is False):
  213. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  214. thread1.daemon = True
  215. thread1.start()
  216. dut1.start_app()
  217. dut1.expect('Loaded app from partition at offset', timeout=30)
  218. try:
  219. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  220. print('Connected to AP with IP: {}'.format(ip_address))
  221. except DUT.ExpectTimeout:
  222. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  223. dut1.expect('Starting Advanced OTA example', timeout=30)
  224. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  225. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  226. dut1.expect('Image validation failed, image is corrupted', timeout=30)
  227. os.remove(binary_file)
  228. @ttfw_idf.idf_example_test(env_tag='Example_WIFI')
  229. def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extra_data):
  230. """
  231. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  232. Application should return with error message in this case.
  233. steps: |
  234. 1. join AP
  235. 2. Generate binary file with truncated headers
  236. 3. Fetch OTA image over HTTPS
  237. 4. Check working of code if headers are not sent completely
  238. """
  239. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  240. server_port = 8001
  241. # Original binary file generated after compilation
  242. bin_name = 'advanced_https_ota.bin'
  243. # Truncated binary file to be generated from original binary file
  244. truncated_bin_name = 'truncated_header.bin'
  245. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  246. truncated_bin_size = 180
  247. # check and log bin size
  248. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  249. f = open(binary_file, 'rb+')
  250. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
  251. fo.write(f.read(truncated_bin_size))
  252. fo.close()
  253. f.close()
  254. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  255. bin_size = os.path.getsize(binary_file)
  256. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  257. # start test
  258. host_ip = get_my_ip()
  259. if (get_server_status(host_ip, server_port) is False):
  260. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  261. thread1.daemon = True
  262. thread1.start()
  263. dut1.start_app()
  264. dut1.expect('Loaded app from partition at offset', timeout=30)
  265. try:
  266. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  267. print('Connected to AP with IP: {}'.format(ip_address))
  268. except DUT.ExpectTimeout:
  269. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  270. dut1.expect('Starting Advanced OTA example', timeout=30)
  271. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  272. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  273. dut1.expect('advanced_https_ota_example: esp_https_ota_read_img_desc failed', timeout=30)
  274. os.remove(binary_file)
  275. @ttfw_idf.idf_example_test(env_tag='Example_WIFI')
  276. def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
  277. """
  278. Working of OTA if random data is added in binary file are validated in this test case.
  279. Magic byte verification should fail in this case.
  280. steps: |
  281. 1. join AP
  282. 2. Generate random binary image
  283. 3. Fetch OTA image over HTTPS
  284. 4. Check working of code for random binary file
  285. """
  286. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  287. server_port = 8001
  288. # Random binary file to be generated
  289. random_bin_name = 'random.bin'
  290. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  291. random_bin_size = 32000
  292. # check and log bin size
  293. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  294. fo = open(binary_file, 'wb+')
  295. # First byte of binary file is always set to zero. If first byte is generated randomly,
  296. # in some cases it may generate 0xE9 which will result in failure of testcase.
  297. fo.write(struct.pack('B', 0))
  298. for i in range(random_bin_size - 1):
  299. fo.write(struct.pack('B', random.randrange(0,255,1)))
  300. fo.close()
  301. bin_size = os.path.getsize(binary_file)
  302. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  303. # start test
  304. host_ip = get_my_ip()
  305. if (get_server_status(host_ip, server_port) is False):
  306. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  307. thread1.daemon = True
  308. thread1.start()
  309. dut1.start_app()
  310. dut1.expect('Loaded app from partition at offset', timeout=30)
  311. try:
  312. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  313. print('Connected to AP with IP: {}'.format(ip_address))
  314. except DUT.ExpectTimeout:
  315. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  316. dut1.expect('Starting Advanced OTA example', timeout=30)
  317. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
  318. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
  319. dut1.expect('esp_ota_ops: OTA image has invalid magic byte', timeout=10)
  320. os.remove(binary_file)
  321. @ttfw_idf.idf_example_test(env_tag='Example_WIFI')
  322. def test_examples_protocol_advanced_https_ota_example_chunked(env, extra_data):
  323. """
  324. This is a positive test case, which downloads complete binary file multiple number of times.
  325. Number of iterations can be specified in variable iterations.
  326. steps: |
  327. 1. join AP
  328. 2. Fetch OTA image over HTTPS
  329. 3. Reboot with the new OTA image
  330. """
  331. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  332. # File to be downloaded. This file is generated after compilation
  333. bin_name = 'advanced_https_ota.bin'
  334. # check and log bin size
  335. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  336. bin_size = os.path.getsize(binary_file)
  337. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  338. # start test
  339. host_ip = get_my_ip()
  340. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  341. dut1.start_app()
  342. dut1.expect('Loaded app from partition at offset', timeout=30)
  343. try:
  344. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  345. print('Connected to AP with IP: {}'.format(ip_address))
  346. except DUT.ExpectTimeout:
  347. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  348. dut1.expect('Starting Advanced OTA example', timeout=30)
  349. print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
  350. dut1.write('https://' + host_ip + ':8070/' + bin_name)
  351. dut1.expect('Loaded app from partition at offset', timeout=60)
  352. dut1.expect('Starting Advanced OTA example', timeout=30)
  353. chunked_server.kill()
  354. os.remove(os.path.join(dut1.app.binary_path, 'server_cert.pem'))
  355. os.remove(os.path.join(dut1.app.binary_path, 'server_key.pem'))
  356. @ttfw_idf.idf_example_test(env_tag='Example_WIFI')
  357. def test_examples_protocol_advanced_https_ota_example_redirect_url(env, extra_data):
  358. """
  359. This is a positive test case, which starts a server and a redirection server.
  360. Redirection server redirects http_request to different port
  361. Number of iterations can be specified in variable iterations.
  362. steps: |
  363. 1. join AP
  364. 2. Fetch OTA image over HTTPS
  365. 3. Reboot with the new OTA image
  366. """
  367. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT)
  368. server_port = 8001
  369. # Port to which the request should be redirecetd
  370. redirection_server_port = 8081
  371. # File to be downloaded. This file is generated after compilation
  372. bin_name = 'advanced_https_ota.bin'
  373. # check and log bin size
  374. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  375. bin_size = os.path.getsize(binary_file)
  376. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  377. # start test
  378. host_ip = get_my_ip()
  379. if (get_server_status(host_ip, server_port) is False):
  380. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  381. thread1.daemon = True
  382. thread1.start()
  383. thread2 = Thread(target=start_redirect_server, args=(dut1.app.binary_path, host_ip, redirection_server_port, server_port))
  384. thread2.daemon = True
  385. thread2.start()
  386. dut1.start_app()
  387. dut1.expect('Loaded app from partition at offset', timeout=30)
  388. try:
  389. ip_address = dut1.expect(re.compile(r' sta ip: ([^,]+),'), timeout=30)
  390. print('Connected to AP with IP: {}'.format(ip_address))
  391. except DUT.ExpectTimeout:
  392. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  393. thread1.close()
  394. thread2.close()
  395. dut1.expect('Starting Advanced OTA example', timeout=30)
  396. print('writing to device: {}'.format('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name))
  397. dut1.write('https://' + host_ip + ':' + str(redirection_server_port) + '/' + bin_name)
  398. dut1.expect('Loaded app from partition at offset', timeout=60)
  399. dut1.expect('Starting Advanced OTA example', timeout=30)
  400. dut1.reset()
  401. @ttfw_idf.idf_example_test(env_tag='Example_8Mflash_Ethernet')
  402. def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_data):
  403. """
  404. Working of OTA when anti_rollback is enabled and security version of new image is less than current one.
  405. Application should return with error message in this case.
  406. steps: |
  407. 1. join AP
  408. 2. Generate binary file with lower security version
  409. 3. Fetch OTA image over HTTPS
  410. 4. Check working of anti_rollback feature
  411. """
  412. dut1 = env.get_dut('advanced_https_ota_example', 'examples/system/ota/advanced_https_ota', dut_class=ttfw_idf.ESP32DUT, app_config_name='anti_rollback')
  413. server_port = 8001
  414. # Original binary file generated after compilation
  415. bin_name = 'advanced_https_ota.bin'
  416. # Modified firmware image to lower security version in its header. This is to enable negative test case
  417. anti_rollback_bin_name = 'advanced_https_ota_lower_sec_version.bin'
  418. # check and log bin size
  419. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  420. file_size = os.path.getsize(binary_file)
  421. f = open(binary_file, 'rb+')
  422. fo = open(os.path.join(dut1.app.binary_path, anti_rollback_bin_name), 'wb+')
  423. fo.write(f.read(file_size))
  424. # Change security_version to 0 for negative test case
  425. fo.seek(36)
  426. fo.write(b'\x00')
  427. fo.close()
  428. f.close()
  429. binary_file = os.path.join(dut1.app.binary_path, anti_rollback_bin_name)
  430. bin_size = os.path.getsize(binary_file)
  431. ttfw_idf.log_performance('advanced_https_ota_bin_size', '{}KB'.format(bin_size // 1024))
  432. # start test
  433. host_ip = get_my_ip()
  434. if (get_server_status(host_ip, server_port) is False):
  435. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  436. thread1.daemon = True
  437. thread1.start()
  438. dut1.start_app()
  439. # Positive Case
  440. dut1.expect('Loaded app from partition at offset', timeout=30)
  441. try:
  442. ip_address = dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
  443. print('Connected to AP with IP: {}'.format(ip_address))
  444. except DUT.ExpectTimeout:
  445. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  446. dut1.expect('Starting Advanced OTA example', timeout=30)
  447. # Use originally generated image with secure_version=1
  448. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  449. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  450. dut1.expect('Loaded app from partition at offset', timeout=60)
  451. dut1.expect(re.compile(r' eth ip: ([^,]+),'), timeout=30)
  452. dut1.expect('App is valid, rollback cancelled successfully', 30)
  453. # Negative Case
  454. dut1.expect('Starting Advanced OTA example', timeout=30)
  455. # Use modified image with secure_version=0
  456. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name))
  457. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + anti_rollback_bin_name)
  458. dut1.expect('New firmware security version is less than eFuse programmed, 0 < 1', timeout=30)
  459. os.remove(anti_rollback_bin_name)
  460. if __name__ == '__main__':
  461. test_examples_protocol_advanced_https_ota_example()
  462. test_examples_protocol_advanced_https_ota_example_chunked()
  463. test_examples_protocol_advanced_https_ota_example_redirect_url()
  464. test_examples_protocol_advanced_https_ota_example_truncated_bin()
  465. test_examples_protocol_advanced_https_ota_example_truncated_header()
  466. test_examples_protocol_advanced_https_ota_example_random()
  467. test_examples_protocol_advanced_https_ota_example_anti_rollback()