example_test.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import re
  2. import os
  3. import struct
  4. import socket
  5. import http.server
  6. from threading import Thread
  7. import ssl
  8. from tiny_test_fw import DUT
  9. import ttfw_idf
  10. import random
  11. import subprocess
  12. server_cert = "-----BEGIN CERTIFICATE-----\n" \
  13. "MIIDXTCCAkWgAwIBAgIJAP4LF7E72HakMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV\n"\
  14. "BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX\n"\
  15. "aWRnaXRzIFB0eSBMdGQwHhcNMTkwNjA3MDk1OTE2WhcNMjAwNjA2MDk1OTE2WjBF\n"\
  16. "MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50\n"\
  17. "ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\n"\
  18. "CgKCAQEAlzfCyv3mIv7TlLkObxunKfCdrJ/zgdANrsx0RBtpEPhV560hWJ0fEin0\n"\
  19. "nIOMpJSiF9E6QsPdr6Q+eogH4XnOMU9JE+iG743N1dPfGEzJvRlyct/Ck8SswKPC\n"\
  20. "9+VXsnOdZmUw9y/xtANbURA/TspvPzz3Avv382ffffrJGh7ooOmaZSCZFlSYHLZA\n"\
  21. "w/XlRr0sSRbLpFGY0gXjaAV8iHHiPDYLy4kZOepjV9U51xi+IGsL4w75zuMgsHyF\n"\
  22. "3nJeGYHgtGVBrkL0ZKG5udY0wcBjysjubDJC4iSlNiq2HD3fhs7j6CZddV2v845M\n"\
  23. "lVKNxP0kO4Uj4D8r+5USWC8JKfAwxQIDAQABo1AwTjAdBgNVHQ4EFgQU6OE7ssfY\n"\
  24. "IIPTDThiUoofUpsD5NwwHwYDVR0jBBgwFoAU6OE7ssfYIIPTDThiUoofUpsD5Nww\n"\
  25. "DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAXIlHS/FJWfmcinUAxyBd\n"\
  26. "/xd5Lu8ykeru6oaUCci+Vk9lyoMMES7lQ+b/00d5x7AcTawkTil9EWpBTPTOTraA\n"\
  27. "lzJMQhNKmSLk0iIoTtAJtSZgUSpIIozqK6lenxQQDsHbXKU6h+u9H6KZE8YcjsFl\n"\
  28. "6vL7sw9BVotw/VxfgjQ5OSGLgoLrdVT0z5C2qOuwOgz1c7jNiJhtMdwN+cOtnJp2\n"\
  29. "fuBgEYyE3eeuWogvkWoDcIA8r17Ixzkpq2oJsdvZcHZPIZShPKW2SHUsl98KDemu\n"\
  30. "y0pQyExmQUbwKE4vbFb9XuWCcL9XaOHQytyszt2DeD67AipvoBwVU7/LBOvqnsmy\n"\
  31. "hA==\n"\
  32. "-----END CERTIFICATE-----\n"
  33. server_key = "-----BEGIN PRIVATE KEY-----\n"\
  34. "MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCXN8LK/eYi/tOU\n"\
  35. "uQ5vG6cp8J2sn/OB0A2uzHREG2kQ+FXnrSFYnR8SKfScg4yklKIX0TpCw92vpD56\n"\
  36. "iAfhec4xT0kT6Ibvjc3V098YTMm9GXJy38KTxKzAo8L35Veyc51mZTD3L/G0A1tR\n"\
  37. "ED9Oym8/PPcC+/fzZ999+skaHuig6ZplIJkWVJgctkDD9eVGvSxJFsukUZjSBeNo\n"\
  38. "BXyIceI8NgvLiRk56mNX1TnXGL4gawvjDvnO4yCwfIXecl4ZgeC0ZUGuQvRkobm5\n"\
  39. "1jTBwGPKyO5sMkLiJKU2KrYcPd+GzuPoJl11Xa/zjkyVUo3E/SQ7hSPgPyv7lRJY\n"\
  40. "Lwkp8DDFAgMBAAECggEAfBhAfQE7mUByNbxgAgI5fot9eaqR1Nf+QpJ6X2H3KPwC\n"\
  41. "02sa0HOwieFwYfj6tB1doBoNq7i89mTc+QUlIn4pHgIowHO0OGawomeKz5BEhjCZ\n"\
  42. "4XeLYGSoODary2+kNkf2xY8JTfFEcyvGBpJEwc4S2VyYgRRx+IgnumTSH+N5mIKZ\n"\
  43. "SXWNdZIuHEmkwod+rPRXs6/r+PH0eVW6WfpINEbr4zVAGXJx2zXQwd2cuV1GTJWh\n"\
  44. "cPVOXLu+XJ9im9B370cYN6GqUnR3fui13urYbnWnEf3syvoH/zuZkyrVChauoFf8\n"\
  45. "8EGb74/HhXK7Q2s8NRakx2c7OxQifCbcy03liUMmyQKBgQDFAob5B/66N4Q2cq/N\n"\
  46. "MWPf98kYBYoLaeEOhEJhLQlKk0pIFCTmtpmUbpoEes2kCUbH7RwczpYko8tlKyoB\n"\
  47. "6Fn6RY4zQQ64KZJI6kQVsjkYpcP/ihnOY6rbds+3yyv+4uPX7Eh9sYZwZMggE19M\n"\
  48. "CkFHkwAjiwqhiiSlUxe20sWmowKBgQDEfx4lxuFzA1PBPeZKGVBTxYPQf+DSLCre\n"\
  49. "ZFg3ZmrxbCjRq1O7Lra4FXWD3dmRq7NDk79JofoW50yD8wD7I0B7opdDfXD2idO8\n"\
  50. "0dBnWUKDr2CAXyoLEINce9kJPbx4kFBQRN9PiGF7VkDQxeQ3kfS8CvcErpTKCOdy\n"\
  51. "5wOwBTwJdwKBgDiTFTeGeDv5nVoVbS67tDao7XKchJvqd9q3WGiXikeELJyuTDqE\n"\
  52. "zW22pTwMF+m3UEAxcxVCrhMvhkUzNAkANHaOatuFHzj7lyqhO5QPbh4J3FMR0X9X\n"\
  53. "V8VWRSg+jA/SECP9koOl6zlzd5Tee0tW1pA7QpryXscs6IEhb3ns5R2JAoGAIkzO\n"\
  54. "RmnhEOKTzDex611f2D+yMsMfy5BKK2f4vjLymBH5TiBKDXKqEpgsW0huoi8Gq9Uu\n"\
  55. "nvvXXAgkIyRYF36f0vUe0nkjLuYAQAWgC2pZYgNLJR13iVbol0xHJoXQUHtgiaJ8\n"\
  56. "GLYFzjHQPqFMpSalQe3oELko39uOC1CoJCHFySECgYBeycUnRBikCO2n8DNhY4Eg\n"\
  57. "9Y3oxcssRt6ea5BZwgW2eAYi7/XqKkmxoSoOykUt3MJx9+EkkrL17bxFSpkj1tvL\n"\
  58. "qvxn7egtsKjjgGNAxwXC4MwCvhveyUQQxtQb8AqGrGqo4jEEN0L15cnP38i2x1Uo\n"\
  59. "muhfskWf4MABV0yTUaKcGg==\n"\
  60. "-----END PRIVATE KEY-----\n"
  61. def get_my_ip():
  62. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  63. s1.connect(("8.8.8.8", 80))
  64. my_ip = s1.getsockname()[0]
  65. s1.close()
  66. return my_ip
  67. def get_server_status(host_ip, port):
  68. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  69. server_status = sock.connect_ex((host_ip, port))
  70. sock.close()
  71. if server_status == 0:
  72. return True
  73. return False
  74. def create_file(server_file, file_data):
  75. with open(server_file, "w+") as file:
  76. file.write(file_data)
  77. def get_ca_cert(ota_image_dir):
  78. os.chdir(ota_image_dir)
  79. server_file = os.path.join(ota_image_dir, "server_cert.pem")
  80. create_file(server_file, server_cert)
  81. key_file = os.path.join(ota_image_dir, "server_key.pem")
  82. create_file(key_file, server_key)
  83. return server_file, key_file
  84. def https_request_handler():
  85. """
  86. Returns a request handler class that handles broken pipe exception
  87. """
  88. class RequestHandler(http.server.SimpleHTTPRequestHandler):
  89. def finish(self):
  90. try:
  91. if not self.wfile.closed:
  92. self.wfile.flush()
  93. self.wfile.close()
  94. except socket.error:
  95. pass
  96. self.rfile.close()
  97. def handle(self):
  98. try:
  99. http.server.BaseHTTPRequestHandler.handle(self)
  100. except socket.error:
  101. pass
  102. return RequestHandler
  103. def start_https_server(ota_image_dir, server_ip, server_port):
  104. server_file, key_file = get_ca_cert(ota_image_dir)
  105. requestHandler = https_request_handler()
  106. httpd = http.server.HTTPServer((server_ip, server_port), requestHandler)
  107. httpd.socket = ssl.wrap_socket(httpd.socket,
  108. keyfile=key_file,
  109. certfile=server_file, server_side=True)
  110. httpd.serve_forever()
  111. def start_chunked_server(ota_image_dir, server_port):
  112. server_file, key_file = get_ca_cert(ota_image_dir)
  113. chunked_server = subprocess.Popen(["openssl", "s_server", "-WWW", "-key", key_file, "-cert", server_file, "-port", str(server_port)])
  114. return chunked_server
  115. def redirect_handler_factory(url):
  116. """
  117. Returns a request handler class that redirects to supplied `url`
  118. """
  119. class RedirectHandler(http.server.SimpleHTTPRequestHandler):
  120. def do_GET(self):
  121. print("Sending resp, URL: " + url)
  122. self.send_response(301)
  123. self.send_header('Location', url)
  124. self.end_headers()
  125. def handle(self):
  126. try:
  127. http.server.BaseHTTPRequestHandler.handle(self)
  128. except socket.error:
  129. pass
  130. return RedirectHandler
  131. def start_redirect_server(ota_image_dir, server_ip, server_port, redirection_port):
  132. os.chdir(ota_image_dir)
  133. server_file, key_file = get_ca_cert(ota_image_dir)
  134. redirectHandler = redirect_handler_factory("https://" + server_ip + ":" + str(redirection_port) + "/advanced_https_ota.bin")
  135. httpd = http.server.HTTPServer((server_ip, server_port), redirectHandler)
  136. httpd.socket = ssl.wrap_socket(httpd.socket,
  137. keyfile=key_file,
  138. certfile=server_file, server_side=True)
  139. httpd.serve_forever()
  140. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  141. def test_examples_protocol_advanced_https_ota_example(env, extra_data):
  142. """
  143. This is a positive test case, which downloads complete binary file multiple number of times.
  144. Number of iterations can be specified in variable iterations.
  145. steps: |
  146. 1. join AP
  147. 2. Fetch OTA image over HTTPS
  148. 3. Reboot with the new OTA image
  149. """
  150. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  151. # Number of iterations to validate OTA
  152. iterations = 3
  153. server_port = 8001
  154. # File to be downloaded. This file is generated after compilation
  155. bin_name = "advanced_https_ota.bin"
  156. # check and log bin size
  157. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  158. bin_size = os.path.getsize(binary_file)
  159. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  160. # start test
  161. host_ip = get_my_ip()
  162. if (get_server_status(host_ip, server_port) is False):
  163. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  164. thread1.daemon = True
  165. thread1.start()
  166. dut1.start_app()
  167. for i in range(iterations):
  168. dut1.expect("Loaded app from partition at offset", timeout=30)
  169. try:
  170. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  171. print("Connected to AP with IP: {}".format(ip_address))
  172. except DUT.ExpectTimeout:
  173. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  174. thread1.close()
  175. dut1.expect("Starting Advanced OTA example", timeout=30)
  176. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + bin_name))
  177. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + bin_name)
  178. dut1.expect("Loaded app from partition at offset", timeout=60)
  179. dut1.expect("Starting Advanced OTA example", timeout=30)
  180. dut1.reset()
  181. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  182. def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_data):
  183. """
  184. Working of OTA if binary file is truncated is validated in this test case.
  185. Application should return with error message in this case.
  186. steps: |
  187. 1. join AP
  188. 2. Generate truncated binary file
  189. 3. Fetch OTA image over HTTPS
  190. 4. Check working of code if bin is truncated
  191. """
  192. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  193. server_port = 8001
  194. # Original binary file generated after compilation
  195. bin_name = "advanced_https_ota.bin"
  196. # Truncated binary file to be generated from original binary file
  197. truncated_bin_name = "truncated.bin"
  198. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  199. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  200. truncated_bin_size = 64000
  201. # check and log bin size
  202. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  203. f = open(binary_file, "rb+")
  204. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
  205. fo.write(f.read(truncated_bin_size))
  206. fo.close()
  207. f.close()
  208. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  209. bin_size = os.path.getsize(binary_file)
  210. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  211. # start test
  212. host_ip = get_my_ip()
  213. if (get_server_status(host_ip, server_port) is False):
  214. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  215. thread1.daemon = True
  216. thread1.start()
  217. dut1.start_app()
  218. dut1.expect("Loaded app from partition at offset", timeout=30)
  219. try:
  220. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  221. print("Connected to AP with IP: {}".format(ip_address))
  222. except DUT.ExpectTimeout:
  223. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  224. dut1.expect("Starting Advanced OTA example", timeout=30)
  225. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  226. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  227. dut1.expect("Image validation failed, image is corrupted", timeout=30)
  228. os.remove(binary_file)
  229. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  230. def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extra_data):
  231. """
  232. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  233. Application should return with error message in this case.
  234. steps: |
  235. 1. join AP
  236. 2. Generate binary file with truncated headers
  237. 3. Fetch OTA image over HTTPS
  238. 4. Check working of code if headers are not sent completely
  239. """
  240. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  241. server_port = 8001
  242. # Original binary file generated after compilation
  243. bin_name = "advanced_https_ota.bin"
  244. # Truncated binary file to be generated from original binary file
  245. truncated_bin_name = "truncated_header.bin"
  246. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  247. truncated_bin_size = 180
  248. # check and log bin size
  249. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  250. f = open(binary_file, "rb+")
  251. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
  252. fo.write(f.read(truncated_bin_size))
  253. fo.close()
  254. f.close()
  255. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  256. bin_size = os.path.getsize(binary_file)
  257. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  258. # start test
  259. host_ip = get_my_ip()
  260. if (get_server_status(host_ip, server_port) is False):
  261. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  262. thread1.daemon = True
  263. thread1.start()
  264. dut1.start_app()
  265. dut1.expect("Loaded app from partition at offset", timeout=30)
  266. try:
  267. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  268. print("Connected to AP with IP: {}".format(ip_address))
  269. except DUT.ExpectTimeout:
  270. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  271. dut1.expect("Starting Advanced OTA example", timeout=30)
  272. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  273. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  274. dut1.expect("advanced_https_ota_example: esp_https_ota_read_img_desc failed", timeout=30)
  275. os.remove(binary_file)
  276. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  277. def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
  278. """
  279. Working of OTA if random data is added in binary file are validated in this test case.
  280. Magic byte verification should fail in this case.
  281. steps: |
  282. 1. join AP
  283. 2. Generate random binary image
  284. 3. Fetch OTA image over HTTPS
  285. 4. Check working of code for random binary file
  286. """
  287. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  288. server_port = 8001
  289. # Random binary file to be generated
  290. random_bin_name = "random.bin"
  291. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  292. random_bin_size = 32000
  293. # check and log bin size
  294. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  295. fo = open(binary_file, "wb+")
  296. # First byte of binary file is always set to zero. If first byte is generated randomly,
  297. # in some cases it may generate 0xE9 which will result in failure of testcase.
  298. fo.write(struct.pack("B", 0))
  299. for i in range(random_bin_size - 1):
  300. fo.write(struct.pack("B", random.randrange(0,255,1)))
  301. fo.close()
  302. bin_size = os.path.getsize(binary_file)
  303. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  304. # start test
  305. host_ip = get_my_ip()
  306. if (get_server_status(host_ip, server_port) is False):
  307. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  308. thread1.daemon = True
  309. thread1.start()
  310. dut1.start_app()
  311. dut1.expect("Loaded app from partition at offset", timeout=30)
  312. try:
  313. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  314. print("Connected to AP with IP: {}".format(ip_address))
  315. except DUT.ExpectTimeout:
  316. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  317. dut1.expect("Starting Advanced OTA example", timeout=30)
  318. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name))
  319. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name)
  320. dut1.expect("esp_ota_ops: OTA image has invalid magic byte", timeout=10)
  321. os.remove(binary_file)
  322. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  323. def test_examples_protocol_advanced_https_ota_example_chunked(env, extra_data):
  324. """
  325. This is a positive test case, which downloads complete binary file multiple number of times.
  326. Number of iterations can be specified in variable iterations.
  327. steps: |
  328. 1. join AP
  329. 2. Fetch OTA image over HTTPS
  330. 3. Reboot with the new OTA image
  331. """
  332. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  333. # File to be downloaded. This file is generated after compilation
  334. bin_name = "advanced_https_ota.bin"
  335. # check and log bin size
  336. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  337. bin_size = os.path.getsize(binary_file)
  338. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  339. # start test
  340. host_ip = get_my_ip()
  341. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  342. dut1.start_app()
  343. dut1.expect("Loaded app from partition at offset", timeout=30)
  344. try:
  345. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  346. print("Connected to AP with IP: {}".format(ip_address))
  347. except DUT.ExpectTimeout:
  348. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  349. dut1.expect("Starting Advanced OTA example", timeout=30)
  350. print("writing to device: {}".format("https://" + host_ip + ":8070/" + bin_name))
  351. dut1.write("https://" + host_ip + ":8070/" + bin_name)
  352. dut1.expect("Loaded app from partition at offset", timeout=60)
  353. dut1.expect("Starting Advanced OTA example", timeout=30)
  354. chunked_server.kill()
  355. os.remove(os.path.join(dut1.app.binary_path, "server_cert.pem"))
  356. os.remove(os.path.join(dut1.app.binary_path, "server_key.pem"))
  357. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  358. def test_examples_protocol_advanced_https_ota_example_redirect_url(env, extra_data):
  359. """
  360. This is a positive test case, which starts a server and a redirection server.
  361. Redirection server redirects http_request to different port
  362. Number of iterations can be specified in variable iterations.
  363. steps: |
  364. 1. join AP
  365. 2. Fetch OTA image over HTTPS
  366. 3. Reboot with the new OTA image
  367. """
  368. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  369. server_port = 8001
  370. # Port to which the request should be redirecetd
  371. redirection_server_port = 8081
  372. # File to be downloaded. This file is generated after compilation
  373. bin_name = "advanced_https_ota.bin"
  374. # check and log bin size
  375. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  376. bin_size = os.path.getsize(binary_file)
  377. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  378. # start test
  379. host_ip = get_my_ip()
  380. if (get_server_status(host_ip, server_port) is False):
  381. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  382. thread1.daemon = True
  383. thread1.start()
  384. thread2 = Thread(target=start_redirect_server, args=(dut1.app.binary_path, host_ip, redirection_server_port, server_port))
  385. thread2.daemon = True
  386. thread2.start()
  387. dut1.start_app()
  388. dut1.expect("Loaded app from partition at offset", timeout=30)
  389. try:
  390. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  391. print("Connected to AP with IP: {}".format(ip_address))
  392. except DUT.ExpectTimeout:
  393. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  394. thread1.close()
  395. thread2.close()
  396. dut1.expect("Starting Advanced OTA example", timeout=30)
  397. print("writing to device: {}".format("https://" + host_ip + ":" + str(redirection_server_port) + "/" + bin_name))
  398. dut1.write("https://" + host_ip + ":" + str(redirection_server_port) + "/" + bin_name)
  399. dut1.expect("Loaded app from partition at offset", timeout=60)
  400. dut1.expect("Starting Advanced OTA example", timeout=30)
  401. dut1.reset()
  402. @ttfw_idf.idf_example_test(env_tag="Example_8Mflash_Ethernet")
  403. def test_examples_protocol_advanced_https_ota_example_anti_rollback(env, extra_data):
  404. """
  405. Working of OTA when anti_rollback is enabled and security version of new image is less than current one.
  406. Application should return with error message in this case.
  407. steps: |
  408. 1. join AP
  409. 2. Generate binary file with lower security version
  410. 3. Fetch OTA image over HTTPS
  411. 4. Check working of anti_rollback feature
  412. """
  413. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT, app_config_name='anti_rollback')
  414. server_port = 8001
  415. # Original binary file generated after compilation
  416. bin_name = "advanced_https_ota.bin"
  417. # Modified firmware image to lower security version in its header. This is to enable negative test case
  418. anti_rollback_bin_name = "advanced_https_ota_lower_sec_version.bin"
  419. # check and log bin size
  420. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  421. file_size = os.path.getsize(binary_file)
  422. f = open(binary_file, "rb+")
  423. fo = open(os.path.join(dut1.app.binary_path, anti_rollback_bin_name), "wb+")
  424. fo.write(f.read(file_size))
  425. # Change security_version to 0 for negative test case
  426. fo.seek(36)
  427. fo.write(b'\x00')
  428. fo.close()
  429. f.close()
  430. binary_file = os.path.join(dut1.app.binary_path, anti_rollback_bin_name)
  431. bin_size = os.path.getsize(binary_file)
  432. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  433. # start test
  434. host_ip = get_my_ip()
  435. if (get_server_status(host_ip, server_port) is False):
  436. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  437. thread1.daemon = True
  438. thread1.start()
  439. dut1.start_app()
  440. # Positive Case
  441. dut1.expect("Loaded app from partition at offset", timeout=30)
  442. try:
  443. ip_address = dut1.expect(re.compile(r" eth ip: ([^,]+),"), timeout=30)
  444. print("Connected to AP with IP: {}".format(ip_address))
  445. except DUT.ExpectTimeout:
  446. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  447. dut1.expect("Starting Advanced OTA example", timeout=30)
  448. # Use originally generated image with secure_version=1
  449. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + bin_name))
  450. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + bin_name)
  451. dut1.expect("Loaded app from partition at offset", timeout=60)
  452. dut1.expect(re.compile(r" eth ip: ([^,]+),"), timeout=30)
  453. dut1.expect("App is valid, rollback cancelled successfully", 30)
  454. # Negative Case
  455. dut1.expect("Starting Advanced OTA example", timeout=30)
  456. # Use modified image with secure_version=0
  457. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + anti_rollback_bin_name))
  458. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + anti_rollback_bin_name)
  459. dut1.expect("New firmware security version is less than eFuse programmed, 0 < 1", timeout=30)
  460. os.remove(anti_rollback_bin_name)
  461. if __name__ == '__main__':
  462. test_examples_protocol_advanced_https_ota_example()
  463. test_examples_protocol_advanced_https_ota_example_chunked()
  464. test_examples_protocol_advanced_https_ota_example_redirect_url()
  465. test_examples_protocol_advanced_https_ota_example_truncated_bin()
  466. test_examples_protocol_advanced_https_ota_example_truncated_header()
  467. test_examples_protocol_advanced_https_ota_example_random()
  468. test_examples_protocol_advanced_https_ota_example_anti_rollback()