example_test.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. import multiprocessing
  2. import os
  3. import re
  4. import socket
  5. import ssl
  6. import struct
  7. from tiny_test_fw import DUT
  8. import ttfw_idf
  9. import random
  10. import subprocess
  11. try:
  12. import BaseHTTPServer
  13. from SimpleHTTPServer import SimpleHTTPRequestHandler
  14. except ImportError:
  15. import http.server as BaseHTTPServer
  16. from http.server import SimpleHTTPRequestHandler
  17. server_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_certs/server_cert.pem')
  18. key_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_certs/server_key.pem')
  19. def get_my_ip():
  20. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  21. s1.connect(("8.8.8.8", 80))
  22. my_ip = s1.getsockname()[0]
  23. s1.close()
  24. return my_ip
  25. def get_server_status(host_ip, port):
  26. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  27. server_status = sock.connect_ex((host_ip, port))
  28. sock.close()
  29. if server_status == 0:
  30. return True
  31. return False
  32. def https_request_handler():
  33. """
  34. Returns a request handler class that handles broken pipe exception
  35. """
  36. class RequestHandler(SimpleHTTPRequestHandler):
  37. def finish(self):
  38. try:
  39. if not self.wfile.closed:
  40. self.wfile.flush()
  41. self.wfile.close()
  42. except socket.error:
  43. pass
  44. self.rfile.close()
  45. def handle(self):
  46. try:
  47. BaseHTTPServer.BaseHTTPRequestHandler.handle(self)
  48. except socket.error:
  49. pass
  50. return RequestHandler
  51. def start_https_server(ota_image_dir, server_ip, server_port):
  52. os.chdir(ota_image_dir)
  53. requestHandler = https_request_handler()
  54. httpd = BaseHTTPServer.HTTPServer((server_ip, server_port), requestHandler)
  55. httpd.socket = ssl.wrap_socket(httpd.socket,
  56. keyfile=key_file,
  57. certfile=server_file, server_side=True)
  58. httpd.serve_forever()
  59. def start_chunked_server(ota_image_dir, server_port):
  60. os.chdir(ota_image_dir)
  61. chunked_server = subprocess.Popen(['openssl', 's_server', '-WWW', '-key', key_file, '-cert', server_file, '-port', str(server_port)])
  62. return chunked_server
  63. def redirect_handler_factory(url):
  64. """
  65. Returns a request handler class that redirects to supplied `url`
  66. """
  67. class RedirectHandler(SimpleHTTPRequestHandler):
  68. def do_GET(self):
  69. print("Sending resp, URL: " + url)
  70. self.send_response(301)
  71. self.send_header('Location', url)
  72. self.end_headers()
  73. def handle(self):
  74. try:
  75. BaseHTTPServer.BaseHTTPRequestHandler.handle(self)
  76. except socket.error:
  77. pass
  78. return RedirectHandler
  79. def start_redirect_server(ota_image_dir, server_ip, server_port, redirection_port):
  80. os.chdir(ota_image_dir)
  81. redirectHandler = redirect_handler_factory('https://' + server_ip + ':' + str(redirection_port) + '/advanced_https_ota.bin')
  82. httpd = BaseHTTPServer.HTTPServer((server_ip, server_port),
  83. redirectHandler)
  84. httpd.socket = ssl.wrap_socket(httpd.socket,
  85. keyfile=key_file,
  86. certfile=server_file, server_side=True)
  87. httpd.serve_forever()
  88. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  89. def test_examples_protocol_advanced_https_ota_example(env, extra_data):
  90. """
  91. This is a positive test case, which downloads complete binary file multiple number of times.
  92. Number of iterations can be specified in variable iterations.
  93. steps: |
  94. 1. join AP
  95. 2. Fetch OTA image over HTTPS
  96. 3. Reboot with the new OTA image
  97. """
  98. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  99. # Number of iterations to validate OTA
  100. iterations = 3
  101. server_port = 8001
  102. # File to be downloaded. This file is generated after compilation
  103. bin_name = "advanced_https_ota.bin"
  104. # check and log bin size
  105. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  106. bin_size = os.path.getsize(binary_file)
  107. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  108. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024, dut1.TARGET)
  109. # start test
  110. host_ip = get_my_ip()
  111. if (get_server_status(host_ip, server_port) is False):
  112. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  113. thread1.daemon = True
  114. thread1.start()
  115. dut1.start_app()
  116. for i in range(iterations):
  117. dut1.expect("Loaded app from partition at offset", timeout=30)
  118. try:
  119. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  120. print("Connected to AP with IP: {}".format(ip_address))
  121. except DUT.ExpectTimeout:
  122. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  123. thread1.terminate()
  124. dut1.expect('Starting Advanced OTA example', timeout=30)
  125. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + bin_name))
  126. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + bin_name)
  127. dut1.expect("Loaded app from partition at offset", timeout=60)
  128. dut1.expect("Starting Advanced OTA example", timeout=30)
  129. dut1.reset()
  130. thread1.terminate()
  131. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  132. def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_data):
  133. """
  134. Working of OTA if binary file is truncated is validated in this test case.
  135. Application should return with error message in this case.
  136. steps: |
  137. 1. join AP
  138. 2. Generate truncated binary file
  139. 3. Fetch OTA image over HTTPS
  140. 4. Check working of code if bin is truncated
  141. """
  142. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  143. server_port = 8001
  144. # Original binary file generated after compilation
  145. bin_name = "advanced_https_ota.bin"
  146. # Truncated binary file to be generated from original binary file
  147. truncated_bin_name = "truncated.bin"
  148. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  149. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  150. truncated_bin_size = 64000
  151. # check and log bin size
  152. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  153. f = open(binary_file, "rb+")
  154. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
  155. fo.write(f.read(truncated_bin_size))
  156. fo.close()
  157. f.close()
  158. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  159. bin_size = os.path.getsize(binary_file)
  160. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  161. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024, dut1.TARGET)
  162. # start test
  163. host_ip = get_my_ip()
  164. if (get_server_status(host_ip, server_port) is False):
  165. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  166. thread1.daemon = True
  167. thread1.start()
  168. dut1.start_app()
  169. dut1.expect("Loaded app from partition at offset", timeout=30)
  170. try:
  171. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  172. print("Connected to AP with IP: {}".format(ip_address))
  173. except DUT.ExpectTimeout:
  174. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  175. thread1.terminate()
  176. dut1.expect('Starting Advanced OTA example', timeout=30)
  177. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  178. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  179. dut1.expect("Image validation failed, image is corrupted", timeout=30)
  180. os.remove(binary_file)
  181. thread1.terminate()
  182. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  183. def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extra_data):
  184. """
  185. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  186. Application should return with error message in this case.
  187. steps: |
  188. 1. join AP
  189. 2. Generate binary file with truncated headers
  190. 3. Fetch OTA image over HTTPS
  191. 4. Check working of code if headers are not sent completely
  192. """
  193. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  194. server_port = 8001
  195. # Original binary file generated after compilation
  196. bin_name = "advanced_https_ota.bin"
  197. # Truncated binary file to be generated from original binary file
  198. truncated_bin_name = "truncated_header.bin"
  199. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  200. truncated_bin_size = 180
  201. # check and log bin size
  202. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  203. f = open(binary_file, "rb+")
  204. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "wb+")
  205. fo.write(f.read(truncated_bin_size))
  206. fo.close()
  207. f.close()
  208. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  209. bin_size = os.path.getsize(binary_file)
  210. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  211. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024, dut1.TARGET)
  212. # start test
  213. host_ip = get_my_ip()
  214. if (get_server_status(host_ip, server_port) is False):
  215. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  216. thread1.daemon = True
  217. thread1.start()
  218. dut1.start_app()
  219. dut1.expect("Loaded app from partition at offset", timeout=30)
  220. try:
  221. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  222. print("Connected to AP with IP: {}".format(ip_address))
  223. except DUT.ExpectTimeout:
  224. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  225. thread1.terminate()
  226. dut1.expect('Starting Advanced OTA example', timeout=30)
  227. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  228. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  229. dut1.expect("advanced_https_ota_example: esp_https_ota_read_img_desc failed", timeout=30)
  230. os.remove(binary_file)
  231. thread1.terminate()
  232. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  233. def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
  234. """
  235. Working of OTA if random data is added in binary file are validated in this test case.
  236. Magic byte verification should fail in this case.
  237. steps: |
  238. 1. join AP
  239. 2. Generate random binary image
  240. 3. Fetch OTA image over HTTPS
  241. 4. Check working of code for random binary file
  242. """
  243. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  244. server_port = 8001
  245. # Random binary file to be generated
  246. random_bin_name = "random.bin"
  247. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  248. random_bin_size = 32000
  249. # check and log bin size
  250. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  251. fo = open(binary_file, "wb+")
  252. # First byte of binary file is always set to zero. If first byte is generated randomly,
  253. # in some cases it may generate 0xE9 which will result in failure of testcase.
  254. fo.write(struct.pack("B", 0))
  255. for i in range(random_bin_size - 1):
  256. fo.write(struct.pack("B", random.randrange(0,255,1)))
  257. fo.close()
  258. bin_size = os.path.getsize(binary_file)
  259. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  260. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024, dut1.TARGET)
  261. # start test
  262. host_ip = get_my_ip()
  263. if (get_server_status(host_ip, server_port) is False):
  264. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  265. thread1.daemon = True
  266. thread1.start()
  267. dut1.start_app()
  268. dut1.expect("Loaded app from partition at offset", timeout=30)
  269. try:
  270. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  271. print("Connected to AP with IP: {}".format(ip_address))
  272. except DUT.ExpectTimeout:
  273. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  274. thread1.terminate()
  275. dut1.expect('Starting Advanced OTA example', timeout=30)
  276. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name))
  277. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name)
  278. dut1.expect("esp_ota_ops: OTA image has invalid magic byte", timeout=10)
  279. os.remove(binary_file)
  280. thread1.terminate()
  281. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  282. def test_examples_protocol_advanced_https_ota_example_chunked(env, extra_data):
  283. """
  284. This is a positive test case, which downloads complete binary file multiple number of times.
  285. Number of iterations can be specified in variable iterations.
  286. steps: |
  287. 1. join AP
  288. 2. Fetch OTA image over HTTPS
  289. 3. Reboot with the new OTA image
  290. """
  291. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  292. # File to be downloaded. This file is generated after compilation
  293. bin_name = "advanced_https_ota.bin"
  294. # check and log bin size
  295. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  296. bin_size = os.path.getsize(binary_file)
  297. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  298. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024, dut1.TARGET)
  299. # start test
  300. host_ip = get_my_ip()
  301. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  302. dut1.start_app()
  303. dut1.expect("Loaded app from partition at offset", timeout=30)
  304. try:
  305. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  306. print("Connected to AP with IP: {}".format(ip_address))
  307. except DUT.ExpectTimeout:
  308. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  309. dut1.expect("Starting Advanced OTA example", timeout=30)
  310. print("writing to device: {}".format("https://" + host_ip + ":8070/" + bin_name))
  311. dut1.write("https://" + host_ip + ":8070/" + bin_name)
  312. dut1.expect("Loaded app from partition at offset", timeout=60)
  313. dut1.expect("Starting Advanced OTA example", timeout=30)
  314. chunked_server.kill()
  315. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  316. def test_examples_protocol_advanced_https_ota_example_redirect_url(env, extra_data):
  317. """
  318. This is a positive test case, which starts a server and a redirection server.
  319. Redirection server redirects http_request to different port
  320. Number of iterations can be specified in variable iterations.
  321. steps: |
  322. 1. join AP
  323. 2. Fetch OTA image over HTTPS
  324. 3. Reboot with the new OTA image
  325. """
  326. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  327. server_port = 8001
  328. # Port to which the request should be redirecetd
  329. redirection_server_port = 8081
  330. # File to be downloaded. This file is generated after compilation
  331. bin_name = "advanced_https_ota.bin"
  332. # check and log bin size
  333. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  334. bin_size = os.path.getsize(binary_file)
  335. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  336. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024, dut1.TARGET)
  337. # start test
  338. host_ip = get_my_ip()
  339. if (get_server_status(host_ip, server_port) is False):
  340. thread1 = multiprocessing.Process(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  341. thread1.daemon = True
  342. thread1.start()
  343. thread2 = multiprocessing.Process(target=start_redirect_server, args=(dut1.app.binary_path, host_ip, redirection_server_port, server_port))
  344. thread2.daemon = True
  345. thread2.start()
  346. dut1.start_app()
  347. dut1.expect("Loaded app from partition at offset", timeout=30)
  348. try:
  349. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  350. print("Connected to AP with IP: {}".format(ip_address))
  351. except DUT.ExpectTimeout:
  352. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  353. thread1.terminate()
  354. thread2.terminate()
  355. dut1.expect('Starting Advanced OTA example', timeout=30)
  356. print("writing to device: {}".format("https://" + host_ip + ":" + str(redirection_server_port) + "/" + bin_name))
  357. dut1.write("https://" + host_ip + ":" + str(redirection_server_port) + "/" + bin_name)
  358. dut1.expect("Loaded app from partition at offset", timeout=60)
  359. dut1.expect("Starting Advanced OTA example", timeout=30)
  360. dut1.reset()
  361. thread1.terminate()
  362. thread2.terminate()
  363. if __name__ == '__main__':
  364. test_examples_protocol_advanced_https_ota_example()
  365. test_examples_protocol_advanced_https_ota_example_chunked()
  366. test_examples_protocol_advanced_https_ota_example_redirect_url()
  367. test_examples_protocol_advanced_https_ota_example_truncated_bin()
  368. test_examples_protocol_advanced_https_ota_example_truncated_header()
  369. test_examples_protocol_advanced_https_ota_example_random()