example_test.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. import http.server
  2. import multiprocessing
  3. import os
  4. import random
  5. import re
  6. import socket
  7. import ssl
  8. import struct
  9. import subprocess
  10. import ttfw_idf
  11. from tiny_test_fw import DUT
  12. server_cert = '-----BEGIN CERTIFICATE-----\n' \
  13. 'MIIDWDCCAkACCQCbF4+gVh/MLjANBgkqhkiG9w0BAQsFADBuMQswCQYDVQQGEwJJ\n'\
  14. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  15. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  16. 'b20wHhcNMjEwNzEyMTIzNjI3WhcNNDEwNzA3MTIzNjI3WjBuMQswCQYDVQQGEwJJ\n'\
  17. 'TjELMAkGA1UECAwCTUgxDDAKBgNVBAcMA1BVTjEMMAoGA1UECgwDRVNQMQwwCgYD\n'\
  18. 'VQQLDANFU1AxDDAKBgNVBAMMA0VTUDEaMBgGCSqGSIb3DQEJARYLZXNwQGVzcC5j\n'\
  19. 'b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDhxF/y7bygndxPwiWL\n'\
  20. 'SwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQuc32W\n'\
  21. 'ukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2mKRbQ\n'\
  22. 'S5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO2fEz\n'\
  23. 'YaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnvL6Oz\n'\
  24. '3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdOAoap\n'\
  25. 'rFTRAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAItw24y565k3C/zENZlxyzto44ud\n'\
  26. 'IYPQXN8Fa2pBlLe1zlSIyuaA/rWQ+i1daS8nPotkCbWZyf5N8DYaTE4B0OfvoUPk\n'\
  27. 'B5uGDmbuk6akvlB5BGiYLfQjWHRsK9/4xjtIqN1H58yf3QNROuKsPAeywWS3Fn32\n'\
  28. '3//OpbWaClQePx6udRYMqAitKR+QxL7/BKZQsX+UyShuq8hjphvXvk0BW8ONzuw9\n'\
  29. 'RcoORxM0FzySYjeQvm4LhzC/P3ZBhEq0xs55aL2a76SJhq5hJy7T/Xz6NFByvlrN\n'\
  30. 'lFJJey33KFrAf5vnV9qcyWFIo7PYy2VsaaEjFeefr7q3sTFSMlJeadexW2Y=\n'\
  31. '-----END CERTIFICATE-----\n'
  32. server_key = '-----BEGIN PRIVATE KEY-----\n'\
  33. 'MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDhxF/y7bygndxP\n'\
  34. 'wiWLSwS9LY3uBMaJgup0ufNKVhx+FhGQOu44SghuJAaH3KkPUnt6SOM8jC97/yQu\n'\
  35. 'c32WukI7eBZoA12kargSnzdv5m5rZZpd+NznSSpoDArOAONKVlzr25A1+aZbix2m\n'\
  36. 'KRbQS5w9o1N2BriQuSzd8gL0Y0zEk3VkOWXEL+0yFUT144HnErnD+xnJtHe11yPO\n'\
  37. '2fEzYaGiilh0ddL26PXTugXMZN/8fRVHP50P2OG0SvFpC7vghlLp4VFM1/r3UJnv\n'\
  38. 'L6Oz3ALc6dhxZEKQucqlpj8l1UegszQToopemtIj0qXTHw2+uUnkUyWIPjPC+wdO\n'\
  39. 'AoaprFTRAgMBAAECggEAE0HCxV/N1Q1h+1OeDDGL5+74yjKSFKyb/vTVcaPCrmaH\n'\
  40. 'fPvp0ddOvMZJ4FDMAsiQS6/n4gQ7EKKEnYmwTqj4eUYW8yxGUn3f0YbPHbZT+Mkj\n'\
  41. 'z5woi3nMKi/MxCGDQZX4Ow3xUQlITUqibsfWcFHis8c4mTqdh4qj7xJzehD2PVYF\n'\
  42. 'gNHZsvVj6MltjBDAVwV1IlGoHjuElm6vuzkfX7phxcA1B4ZqdYY17yCXUnvui46z\n'\
  43. 'Xn2kUTOOUCEgfgvGa9E+l4OtdXi5IxjaSraU+dlg2KsE4TpCuN2MEVkeR5Ms3Y7Q\n'\
  44. 'jgJl8vlNFJDQpbFukLcYwG7rO5N5dQ6WWfVia/5XgQKBgQD74at/bXAPrh9NxPmz\n'\
  45. 'i1oqCHMDoM9sz8xIMZLF9YVu3Jf8ux4xVpRSnNy5RU1gl7ZXbpdgeIQ4v04zy5aw\n'\
  46. '8T4tu9K3XnR3UXOy25AK0q+cnnxZg3kFQm+PhtOCKEFjPHrgo2MUfnj+EDddod7N\n'\
  47. 'JQr9q5rEFbqHupFPpWlqCa3QmQKBgQDldWUGokNaEpmgHDMnHxiibXV5LQhzf8Rq\n'\
  48. 'gJIQXb7R9EsTSXEvsDyqTBb7PHp2Ko7rZ5YQfyf8OogGGjGElnPoU/a+Jij1gVFv\n'\
  49. 'kZ064uXAAISBkwHdcuobqc5EbG3ceyH46F+FBFhqM8KcbxJxx08objmh58+83InN\n'\
  50. 'P9Qr25Xw+QKBgEGXMHuMWgQbSZeM1aFFhoMvlBO7yogBTKb4Ecpu9wI5e3Kan3Al\n'\
  51. 'pZYltuyf+VhP6XG3IMBEYdoNJyYhu+nzyEdMg8CwXg+8LC7FMis/Ve+o7aS5scgG\n'\
  52. '1to/N9DK/swCsdTRdzmc/ZDbVC+TuVsebFBGYZTyO5KgqLpezqaIQrTxAoGALFCU\n'\
  53. '10glO9MVyl9H3clap5v+MQ3qcOv/EhaMnw6L2N6WVT481tnxjW4ujgzrFcE4YuxZ\n'\
  54. 'hgwYu9TOCmeqopGwBvGYWLbj+C4mfSahOAs0FfXDoYazuIIGBpuv03UhbpB1Si4O\n'\
  55. 'rJDfRnuCnVWyOTkl54gKJ2OusinhjztBjcrV1XkCgYEA3qNi4uBsPdyz9BZGb/3G\n'\
  56. 'rOMSw0CaT4pEMTLZqURmDP/0hxvTk1polP7O/FYwxVuJnBb6mzDa0xpLFPTpIAnJ\n'\
  57. 'YXB8xpXU69QVh+EBbemdJWOd+zp5UCfXvb2shAeG3Tn/Dz4cBBMEUutbzP+or0nG\n'\
  58. 'vSXnRLaxQhooWm+IuX9SuBQ=\n'\
  59. '-----END PRIVATE KEY-----\n'
  60. def get_my_ip():
  61. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  62. s1.connect(('8.8.8.8', 80))
  63. my_ip = s1.getsockname()[0]
  64. s1.close()
  65. return my_ip
  66. def get_server_status(host_ip, port):
  67. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  68. server_status = sock.connect_ex((host_ip, port))
  69. sock.close()
  70. if server_status == 0:
  71. return True
  72. return False
  73. def create_file(server_file, file_data):
  74. with open(server_file, 'w+') as file:
  75. file.write(file_data)
  76. def get_ca_cert(ota_image_dir):
  77. os.chdir(ota_image_dir)
  78. server_file = os.path.join(ota_image_dir, 'server_cert.pem')
  79. create_file(server_file, server_cert)
  80. key_file = os.path.join(ota_image_dir, 'server_key.pem')
  81. create_file(key_file, server_key)
  82. return server_file, key_file
  83. def https_request_handler():
  84. """
  85. Returns a request handler class that handles broken pipe exception
  86. """
  87. class RequestHandler(http.server.SimpleHTTPRequestHandler):
  88. def finish(self):
  89. try:
  90. if not self.wfile.closed:
  91. self.wfile.flush()
  92. self.wfile.close()
  93. except socket.error:
  94. pass
  95. self.rfile.close()
  96. def handle(self):
  97. try:
  98. http.server.BaseHTTPRequestHandler.handle(self)
  99. except socket.error:
  100. pass
  101. return RequestHandler
  102. def start_https_server(ota_image_dir, server_ip, server_port):
  103. server_file, key_file = get_ca_cert(ota_image_dir)
  104. requestHandler = https_request_handler()
  105. httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
  106. httpd.socket = ssl.wrap_socket(httpd.socket,
  107. keyfile=key_file,
  108. certfile=server_file, server_side=True)
  109. httpd.serve_forever()
  110. def start_chunked_server(ota_image_dir, server_port):
  111. server_file, key_file = get_ca_cert(ota_image_dir)
  112. chunked_server = subprocess.Popen(['openssl', 's_server', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
  113. return chunked_server
  114. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  115. def test_examples_protocol_native_ota_example(env, extra_data):
  116. """
  117. This is a positive test case, which downloads complete binary file multiple number of times.
  118. Number of iterations can be specified in variable iterations.
  119. steps: |
  120. 1. join AP
  121. 2. Fetch OTA image over HTTPS
  122. 3. Reboot with the new OTA image
  123. """
  124. dut1 = env.get_dut('native_ota_example', 'examples/system/ota/native_ota_example', dut_class=ttfw_idf.ESP32DUT)
  125. server_port = 8002
  126. # No. of times working of application to be validated
  127. iterations = 3
  128. # File to be downloaded. This file is generated after compilation
  129. bin_name = 'native_ota.bin'
  130. # check and log bin size
  131. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  132. bin_size = os.path.getsize(binary_file)
  133. ttfw_idf.log_performance('native_ota_bin_size', '{}KB'.format(bin_size // 1024))
  134. # start test
  135. host_ip = get_my_ip()
  136. if (get_server_status(host_ip, server_port) is False):
  137. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  138. thread1.daemon = True
  139. thread1.start()
  140. dut1.start_app()
  141. for i in range(iterations):
  142. dut1.expect('Loaded app from partition at offset', timeout=30)
  143. try:
  144. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  145. print('Connected to AP with IP: {}'.format(ip_address))
  146. except DUT.ExpectTimeout:
  147. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  148. thread1.terminate()
  149. dut1.expect('Starting OTA example', timeout=30)
  150. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + bin_name))
  151. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + bin_name)
  152. dut1.expect('Loaded app from partition at offset', timeout=60)
  153. dut1.expect('Starting OTA example', timeout=30)
  154. dut1.reset()
  155. thread1.terminate()
  156. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  157. def test_examples_protocol_native_ota_example_truncated_bin(env, extra_data):
  158. """
  159. Working of OTA if binary file is truncated is validated in this test case.
  160. Application should return with error message in this case.
  161. steps: |
  162. 1. join AP
  163. 2. Generate truncated binary file
  164. 3. Fetch OTA image over HTTPS
  165. 4. Check working of code if bin is truncated
  166. """
  167. dut1 = env.get_dut('native_ota_example', 'examples/system/ota/native_ota_example', dut_class=ttfw_idf.ESP32DUT)
  168. server_port = 8002
  169. # Original binary file generated after compilation
  170. bin_name = 'native_ota.bin'
  171. # Truncated binary file to be generated from original binary file
  172. truncated_bin_name = 'truncated.bin'
  173. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  174. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  175. truncated_bin_size = 64000
  176. # check and log bin size
  177. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  178. f = open(binary_file, 'rb+')
  179. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
  180. fo.write(f.read(truncated_bin_size))
  181. fo.close()
  182. f.close()
  183. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  184. bin_size = os.path.getsize(binary_file)
  185. ttfw_idf.log_performance('native_ota_bin_size', '{}KB'.format(bin_size // 1024))
  186. # start test
  187. host_ip = get_my_ip()
  188. if (get_server_status(host_ip, server_port) is False):
  189. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  190. thread1.daemon = True
  191. thread1.start()
  192. dut1.start_app()
  193. dut1.expect('Loaded app from partition at offset', timeout=30)
  194. try:
  195. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=60)
  196. print('Connected to AP with IP: {}'.format(ip_address))
  197. except DUT.ExpectTimeout:
  198. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  199. thread1.terminate()
  200. dut1.expect('Starting OTA example', timeout=30)
  201. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  202. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  203. dut1.expect('native_ota_example: Image validation failed, image is corrupted', timeout=20)
  204. os.remove(binary_file)
  205. thread1.terminate()
  206. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  207. def test_examples_protocol_native_ota_example_truncated_header(env, extra_data):
  208. """
  209. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  210. Application should return with error message in this case.
  211. steps: |
  212. 1. join AP
  213. 2. Generate binary file with truncated headers
  214. 3. Fetch OTA image over HTTPS
  215. 4. Check working of code if headers are not sent completely
  216. """
  217. dut1 = env.get_dut('native_ota_example', 'examples/system/ota/native_ota_example', dut_class=ttfw_idf.ESP32DUT)
  218. server_port = 8002
  219. # Original binary file generated after compilation
  220. bin_name = 'native_ota.bin'
  221. # Truncated binary file to be generated from original binary file
  222. truncated_bin_name = 'truncated_header.bin'
  223. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  224. truncated_bin_size = 180
  225. # check and log bin size
  226. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  227. f = open(binary_file, 'rb+')
  228. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), 'wb+')
  229. fo.write(f.read(truncated_bin_size))
  230. fo.close()
  231. f.close()
  232. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  233. bin_size = os.path.getsize(binary_file)
  234. ttfw_idf.log_performance('native_ota_bin_size', '{}KB'.format(bin_size // 1024))
  235. # start test
  236. host_ip = get_my_ip()
  237. if (get_server_status(host_ip, server_port) is False):
  238. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  239. thread1.daemon = True
  240. thread1.start()
  241. dut1.start_app()
  242. dut1.expect('Loaded app from partition at offset', timeout=30)
  243. try:
  244. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=60)
  245. print('Connected to AP with IP: {}'.format(ip_address))
  246. except DUT.ExpectTimeout:
  247. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  248. thread1.terminate()
  249. dut1.expect('Starting OTA example', timeout=30)
  250. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name))
  251. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + truncated_bin_name)
  252. dut1.expect('native_ota_example: received package is not fit len', timeout=20)
  253. os.remove(binary_file)
  254. thread1.terminate()
  255. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  256. def test_examples_protocol_native_ota_example_random(env, extra_data):
  257. """
  258. Working of OTA if random data is added in binary file are validated in this test case.
  259. Magic byte verification should fail in this case.
  260. steps: |
  261. 1. join AP
  262. 2. Generate random binary image
  263. 3. Fetch OTA image over HTTPS
  264. 4. Check working of code for random binary file
  265. """
  266. dut1 = env.get_dut('native_ota_example', 'examples/system/ota/native_ota_example', dut_class=ttfw_idf.ESP32DUT)
  267. server_port = 8002
  268. # Random binary file to be generated
  269. random_bin_name = 'random.bin'
  270. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  271. random_bin_size = 32000
  272. # check and log bin size
  273. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  274. fo = open(binary_file, 'wb+')
  275. # First byte of binary file is always set to zero. If first byte is generated randomly,
  276. # in some cases it may generate 0xE9 which will result in failure of testcase.
  277. fo.write(struct.pack('B', 0))
  278. for i in range(random_bin_size - 1):
  279. fo.write(struct.pack('B', random.randrange(0,255,1)))
  280. fo.close()
  281. bin_size = os.path.getsize(binary_file)
  282. ttfw_idf.log_performance('native_ota_bin_size', '{}KB'.format(bin_size // 1024))
  283. # start test
  284. host_ip = get_my_ip()
  285. if (get_server_status(host_ip, server_port) is False):
  286. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  287. thread1.daemon = True
  288. thread1.start()
  289. dut1.start_app()
  290. dut1.expect('Loaded app from partition at offset', timeout=30)
  291. try:
  292. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=60)
  293. print('Connected to AP with IP: {}'.format(ip_address))
  294. except DUT.ExpectTimeout:
  295. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  296. thread1.terminate()
  297. dut1.expect('Starting OTA example', timeout=30)
  298. print('writing to device: {}'.format('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name))
  299. dut1.write('https://' + host_ip + ':' + str(server_port) + '/' + random_bin_name)
  300. dut1.expect('esp_ota_ops: OTA image has invalid magic byte', timeout=20)
  301. os.remove(binary_file)
  302. thread1.terminate()
  303. @ttfw_idf.idf_example_test(env_tag='EXAMPLE_ETH_OTA')
  304. def test_examples_protocol_native_ota_example_chunked(env, extra_data):
  305. """
  306. This is a positive test case, which downloads complete binary file multiple number of times.
  307. Number of iterations can be specified in variable iterations.
  308. steps: |
  309. 1. join AP
  310. 2. Fetch OTA image over HTTPS
  311. 3. Reboot with the new OTA image
  312. """
  313. dut1 = env.get_dut('native_ota_example', 'examples/system/ota/native_ota_example', dut_class=ttfw_idf.ESP32DUT)
  314. # File to be downloaded. This file is generated after compilation
  315. bin_name = 'native_ota.bin'
  316. # check and log bin size
  317. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  318. bin_size = os.path.getsize(binary_file)
  319. ttfw_idf.log_performance('native_ota_bin_size', '{}KB'.format(bin_size // 1024))
  320. # start test
  321. host_ip = get_my_ip()
  322. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  323. dut1.start_app()
  324. dut1.expect('Loaded app from partition at offset', timeout=30)
  325. try:
  326. ip_address = dut1.expect(re.compile(r' (sta|eth) ip: ([^,]+),'), timeout=30)
  327. print('Connected to AP with IP: {}'.format(ip_address))
  328. except DUT.ExpectTimeout:
  329. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  330. dut1.expect('Starting OTA example', timeout=30)
  331. print('writing to device: {}'.format('https://' + host_ip + ':8070/' + bin_name))
  332. dut1.write('https://' + host_ip + ':8070/' + bin_name)
  333. dut1.expect('Loaded app from partition at offset', timeout=60)
  334. dut1.expect('Starting OTA example', timeout=30)
  335. chunked_server.kill()
  336. os.remove(os.path.join(dut1.app.binary_path, 'server_cert.pem'))
  337. os.remove(os.path.join(dut1.app.binary_path, 'server_key.pem'))
  338. if __name__ == '__main__':
  339. test_examples_protocol_native_ota_example()
  340. test_examples_protocol_native_ota_example_chunked()
  341. test_examples_protocol_native_ota_example_truncated_bin()
  342. test_examples_protocol_native_ota_example_truncated_header()
  343. test_examples_protocol_native_ota_example_random()