example_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import re
  2. import os
  3. import socket
  4. import BaseHTTPServer
  5. import SimpleHTTPServer
  6. from threading import Thread
  7. import ssl
  8. from tiny_test_fw import DUT
  9. import ttfw_idf
  10. import random
  11. import subprocess
  12. server_cert = "-----BEGIN CERTIFICATE-----\n" \
  13. "MIIDXTCCAkWgAwIBAgIJAP4LF7E72HakMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV\n"\
  14. "BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX\n"\
  15. "aWRnaXRzIFB0eSBMdGQwHhcNMTkwNjA3MDk1OTE2WhcNMjAwNjA2MDk1OTE2WjBF\n"\
  16. "MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50\n"\
  17. "ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\n"\
  18. "CgKCAQEAlzfCyv3mIv7TlLkObxunKfCdrJ/zgdANrsx0RBtpEPhV560hWJ0fEin0\n"\
  19. "nIOMpJSiF9E6QsPdr6Q+eogH4XnOMU9JE+iG743N1dPfGEzJvRlyct/Ck8SswKPC\n"\
  20. "9+VXsnOdZmUw9y/xtANbURA/TspvPzz3Avv382ffffrJGh7ooOmaZSCZFlSYHLZA\n"\
  21. "w/XlRr0sSRbLpFGY0gXjaAV8iHHiPDYLy4kZOepjV9U51xi+IGsL4w75zuMgsHyF\n"\
  22. "3nJeGYHgtGVBrkL0ZKG5udY0wcBjysjubDJC4iSlNiq2HD3fhs7j6CZddV2v845M\n"\
  23. "lVKNxP0kO4Uj4D8r+5USWC8JKfAwxQIDAQABo1AwTjAdBgNVHQ4EFgQU6OE7ssfY\n"\
  24. "IIPTDThiUoofUpsD5NwwHwYDVR0jBBgwFoAU6OE7ssfYIIPTDThiUoofUpsD5Nww\n"\
  25. "DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAXIlHS/FJWfmcinUAxyBd\n"\
  26. "/xd5Lu8ykeru6oaUCci+Vk9lyoMMES7lQ+b/00d5x7AcTawkTil9EWpBTPTOTraA\n"\
  27. "lzJMQhNKmSLk0iIoTtAJtSZgUSpIIozqK6lenxQQDsHbXKU6h+u9H6KZE8YcjsFl\n"\
  28. "6vL7sw9BVotw/VxfgjQ5OSGLgoLrdVT0z5C2qOuwOgz1c7jNiJhtMdwN+cOtnJp2\n"\
  29. "fuBgEYyE3eeuWogvkWoDcIA8r17Ixzkpq2oJsdvZcHZPIZShPKW2SHUsl98KDemu\n"\
  30. "y0pQyExmQUbwKE4vbFb9XuWCcL9XaOHQytyszt2DeD67AipvoBwVU7/LBOvqnsmy\n"\
  31. "hA==\n"\
  32. "-----END CERTIFICATE-----\n"
  33. server_key = "-----BEGIN PRIVATE KEY-----\n"\
  34. "MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCXN8LK/eYi/tOU\n"\
  35. "uQ5vG6cp8J2sn/OB0A2uzHREG2kQ+FXnrSFYnR8SKfScg4yklKIX0TpCw92vpD56\n"\
  36. "iAfhec4xT0kT6Ibvjc3V098YTMm9GXJy38KTxKzAo8L35Veyc51mZTD3L/G0A1tR\n"\
  37. "ED9Oym8/PPcC+/fzZ999+skaHuig6ZplIJkWVJgctkDD9eVGvSxJFsukUZjSBeNo\n"\
  38. "BXyIceI8NgvLiRk56mNX1TnXGL4gawvjDvnO4yCwfIXecl4ZgeC0ZUGuQvRkobm5\n"\
  39. "1jTBwGPKyO5sMkLiJKU2KrYcPd+GzuPoJl11Xa/zjkyVUo3E/SQ7hSPgPyv7lRJY\n"\
  40. "Lwkp8DDFAgMBAAECggEAfBhAfQE7mUByNbxgAgI5fot9eaqR1Nf+QpJ6X2H3KPwC\n"\
  41. "02sa0HOwieFwYfj6tB1doBoNq7i89mTc+QUlIn4pHgIowHO0OGawomeKz5BEhjCZ\n"\
  42. "4XeLYGSoODary2+kNkf2xY8JTfFEcyvGBpJEwc4S2VyYgRRx+IgnumTSH+N5mIKZ\n"\
  43. "SXWNdZIuHEmkwod+rPRXs6/r+PH0eVW6WfpINEbr4zVAGXJx2zXQwd2cuV1GTJWh\n"\
  44. "cPVOXLu+XJ9im9B370cYN6GqUnR3fui13urYbnWnEf3syvoH/zuZkyrVChauoFf8\n"\
  45. "8EGb74/HhXK7Q2s8NRakx2c7OxQifCbcy03liUMmyQKBgQDFAob5B/66N4Q2cq/N\n"\
  46. "MWPf98kYBYoLaeEOhEJhLQlKk0pIFCTmtpmUbpoEes2kCUbH7RwczpYko8tlKyoB\n"\
  47. "6Fn6RY4zQQ64KZJI6kQVsjkYpcP/ihnOY6rbds+3yyv+4uPX7Eh9sYZwZMggE19M\n"\
  48. "CkFHkwAjiwqhiiSlUxe20sWmowKBgQDEfx4lxuFzA1PBPeZKGVBTxYPQf+DSLCre\n"\
  49. "ZFg3ZmrxbCjRq1O7Lra4FXWD3dmRq7NDk79JofoW50yD8wD7I0B7opdDfXD2idO8\n"\
  50. "0dBnWUKDr2CAXyoLEINce9kJPbx4kFBQRN9PiGF7VkDQxeQ3kfS8CvcErpTKCOdy\n"\
  51. "5wOwBTwJdwKBgDiTFTeGeDv5nVoVbS67tDao7XKchJvqd9q3WGiXikeELJyuTDqE\n"\
  52. "zW22pTwMF+m3UEAxcxVCrhMvhkUzNAkANHaOatuFHzj7lyqhO5QPbh4J3FMR0X9X\n"\
  53. "V8VWRSg+jA/SECP9koOl6zlzd5Tee0tW1pA7QpryXscs6IEhb3ns5R2JAoGAIkzO\n"\
  54. "RmnhEOKTzDex611f2D+yMsMfy5BKK2f4vjLymBH5TiBKDXKqEpgsW0huoi8Gq9Uu\n"\
  55. "nvvXXAgkIyRYF36f0vUe0nkjLuYAQAWgC2pZYgNLJR13iVbol0xHJoXQUHtgiaJ8\n"\
  56. "GLYFzjHQPqFMpSalQe3oELko39uOC1CoJCHFySECgYBeycUnRBikCO2n8DNhY4Eg\n"\
  57. "9Y3oxcssRt6ea5BZwgW2eAYi7/XqKkmxoSoOykUt3MJx9+EkkrL17bxFSpkj1tvL\n"\
  58. "qvxn7egtsKjjgGNAxwXC4MwCvhveyUQQxtQb8AqGrGqo4jEEN0L15cnP38i2x1Uo\n"\
  59. "muhfskWf4MABV0yTUaKcGg==\n"\
  60. "-----END PRIVATE KEY-----\n"
  61. def get_my_ip():
  62. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  63. s1.connect(("8.8.8.8", 80))
  64. my_ip = s1.getsockname()[0]
  65. s1.close()
  66. return my_ip
  67. def start_https_server(ota_image_dir, server_ip, server_port):
  68. os.chdir(ota_image_dir)
  69. server_file = os.path.join(ota_image_dir, "server_cert.pem")
  70. cert_file_handle = open(server_file, "w+")
  71. cert_file_handle.write(server_cert)
  72. cert_file_handle.close()
  73. key_file = os.path.join(ota_image_dir, "server_key.pem")
  74. key_file_handle = open("server_key.pem", "w+")
  75. key_file_handle.write(server_key)
  76. key_file_handle.close()
  77. httpd = BaseHTTPServer.HTTPServer((server_ip, server_port),
  78. SimpleHTTPServer.SimpleHTTPRequestHandler)
  79. httpd.socket = ssl.wrap_socket(httpd.socket,
  80. keyfile=key_file,
  81. certfile=server_file, server_side=True)
  82. httpd.serve_forever()
  83. def start_chunked_server(ota_image_dir, server_port):
  84. os.chdir(ota_image_dir)
  85. server_file = os.path.join(ota_image_dir, "server_cert.pem")
  86. cert_file_handle = open(server_file, "w+")
  87. cert_file_handle.write(server_cert)
  88. key_file = os.path.join(ota_image_dir, "server_key.pem")
  89. key_file_handle = open("server_key.pem", "w+")
  90. key_file_handle.write(server_key)
  91. chunked_server = subprocess.Popen(["openssl", "s_server", "-WWW", "-key", key_file, "-cert", server_file, "-port", str(server_port)])
  92. return chunked_server
  93. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  94. def test_examples_protocol_advanced_https_ota_example(env, extra_data):
  95. """
  96. This is a positive test case, which downloads complete binary file multiple number of times.
  97. Number of iterations can be specified in variable iterations.
  98. steps: |
  99. 1. join AP
  100. 2. Fetch OTA image over HTTPS
  101. 3. Reboot with the new OTA image
  102. """
  103. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  104. # Number of iterations to validate OTA
  105. iterations = 3
  106. # File to be downloaded. This file is generated after compilation
  107. bin_name = "advanced_https_ota.bin"
  108. # check and log bin size
  109. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  110. bin_size = os.path.getsize(binary_file)
  111. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  112. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  113. # start test
  114. host_ip = get_my_ip()
  115. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8001))
  116. thread1.daemon = True
  117. thread1.start()
  118. dut1.start_app()
  119. for i in range(iterations):
  120. dut1.expect("Loaded app from partition at offset", timeout=30)
  121. try:
  122. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  123. print("Connected to AP with IP: {}".format(ip_address))
  124. except DUT.ExpectTimeout:
  125. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  126. thread1.close()
  127. dut1.expect("Starting Advanced OTA example", timeout=30)
  128. print("writing to device: {}".format("https://" + host_ip + ":8001/" + bin_name))
  129. dut1.write("https://" + host_ip + ":8001/" + bin_name)
  130. dut1.expect("Loaded app from partition at offset", timeout=60)
  131. dut1.expect("Starting Advanced OTA example", timeout=30)
  132. dut1.reset()
  133. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  134. def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_data):
  135. """
  136. Working of OTA if binary file is truncated is validated in this test case.
  137. Application should return with error message in this case.
  138. steps: |
  139. 1. join AP
  140. 2. Generate truncated binary file
  141. 3. Fetch OTA image over HTTPS
  142. 4. Check working of code if bin is truncated
  143. """
  144. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  145. # Original binary file generated after compilation
  146. bin_name = "advanced_https_ota.bin"
  147. # Truncated binary file to be generated from original binary file
  148. truncated_bin_name = "truncated.bin"
  149. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  150. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  151. truncated_bin_size = 64000
  152. # check and log bin size
  153. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  154. f = open(binary_file, "r+")
  155. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  156. fo.write(f.read(truncated_bin_size))
  157. fo.close()
  158. f.close()
  159. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  160. bin_size = os.path.getsize(binary_file)
  161. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  162. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  163. # start test
  164. host_ip = get_my_ip()
  165. dut1.start_app()
  166. dut1.expect("Loaded app from partition at offset", timeout=30)
  167. try:
  168. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  169. print("Connected to AP with IP: {}".format(ip_address))
  170. except DUT.ExpectTimeout:
  171. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  172. dut1.expect("Starting Advanced OTA example", timeout=30)
  173. print("writing to device: {}".format("https://" + host_ip + ":8001/" + truncated_bin_name))
  174. dut1.write("https://" + host_ip + ":8001/" + truncated_bin_name)
  175. dut1.expect("Image validation failed, image is corrupted", timeout=30)
  176. os.remove(binary_file)
  177. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  178. def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extra_data):
  179. """
  180. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  181. Application should return with error message in this case.
  182. steps: |
  183. 1. join AP
  184. 2. Generate binary file with truncated headers
  185. 3. Fetch OTA image over HTTPS
  186. 4. Check working of code if headers are not sent completely
  187. """
  188. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  189. # Original binary file generated after compilation
  190. bin_name = "advanced_https_ota.bin"
  191. # Truncated binary file to be generated from original binary file
  192. truncated_bin_name = "truncated_header.bin"
  193. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  194. truncated_bin_size = 180
  195. # check and log bin size
  196. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  197. f = open(binary_file, "r+")
  198. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  199. fo.write(f.read(truncated_bin_size))
  200. fo.close()
  201. f.close()
  202. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  203. bin_size = os.path.getsize(binary_file)
  204. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  205. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  206. # start test
  207. host_ip = get_my_ip()
  208. dut1.start_app()
  209. dut1.expect("Loaded app from partition at offset", timeout=30)
  210. try:
  211. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  212. print("Connected to AP with IP: {}".format(ip_address))
  213. except DUT.ExpectTimeout:
  214. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  215. dut1.expect("Starting Advanced OTA example", timeout=30)
  216. print("writing to device: {}".format("https://" + host_ip + ":8001/" + truncated_bin_name))
  217. dut1.write("https://" + host_ip + ":8001/" + truncated_bin_name)
  218. dut1.expect("advanced_https_ota_example: esp_https_ota_read_img_desc failed", timeout=30)
  219. os.remove(binary_file)
  220. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  221. def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
  222. """
  223. Working of OTA if random data is added in binary file are validated in this test case.
  224. Magic byte verification should fail in this case.
  225. steps: |
  226. 1. join AP
  227. 2. Generate random binary image
  228. 3. Fetch OTA image over HTTPS
  229. 4. Check working of code for random binary file
  230. """
  231. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  232. # Random binary file to be generated
  233. random_bin_name = "random.bin"
  234. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  235. random_bin_size = 32000
  236. # check and log bin size
  237. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  238. fo = open(binary_file, "w+")
  239. # First byte of binary file is always set to zero. If first byte is generated randomly,
  240. # in some cases it may generate 0xE9 which will result in failure of testcase.
  241. fo.write(str(0))
  242. for i in range(random_bin_size - 1):
  243. fo.write(str(random.randrange(0,255,1)))
  244. fo.close()
  245. bin_size = os.path.getsize(binary_file)
  246. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  247. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  248. # start test
  249. host_ip = get_my_ip()
  250. dut1.start_app()
  251. dut1.expect("Loaded app from partition at offset", timeout=30)
  252. try:
  253. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  254. print("Connected to AP with IP: {}".format(ip_address))
  255. except DUT.ExpectTimeout:
  256. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  257. dut1.expect("Starting Advanced OTA example", timeout=30)
  258. print("writing to device: {}".format("https://" + host_ip + ":8001/" + random_bin_name))
  259. dut1.write("https://" + host_ip + ":8001/" + random_bin_name)
  260. dut1.expect("esp_ota_ops: OTA image has invalid magic byte", timeout=10)
  261. os.remove(binary_file)
  262. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  263. def test_examples_protocol_advanced_https_ota_example_chunked(env, extra_data):
  264. """
  265. This is a positive test case, which downloads complete binary file multiple number of times.
  266. Number of iterations can be specified in variable iterations.
  267. steps: |
  268. 1. join AP
  269. 2. Fetch OTA image over HTTPS
  270. 3. Reboot with the new OTA image
  271. """
  272. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota", dut_class=ttfw_idf.ESP32DUT)
  273. # File to be downloaded. This file is generated after compilation
  274. bin_name = "advanced_https_ota.bin"
  275. # check and log bin size
  276. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  277. bin_size = os.path.getsize(binary_file)
  278. ttfw_idf.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  279. ttfw_idf.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  280. # start test
  281. host_ip = get_my_ip()
  282. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  283. dut1.start_app()
  284. dut1.expect("Loaded app from partition at offset", timeout=30)
  285. try:
  286. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  287. print("Connected to AP with IP: {}".format(ip_address))
  288. except DUT.ExpectTimeout:
  289. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  290. dut1.expect("Starting Advanced OTA example", timeout=30)
  291. print("writing to device: {}".format("https://" + host_ip + ":8070/" + bin_name))
  292. dut1.write("https://" + host_ip + ":8070/" + bin_name)
  293. dut1.expect("Loaded app from partition at offset", timeout=60)
  294. dut1.expect("Starting Advanced OTA example", timeout=30)
  295. chunked_server.kill()
  296. os.remove(os.path.join(dut1.app.binary_path, "server_cert.pem"))
  297. os.remove(os.path.join(dut1.app.binary_path, "server_key.pem"))
  298. if __name__ == '__main__':
  299. test_examples_protocol_advanced_https_ota_example()
  300. test_examples_protocol_advanced_https_ota_example_chunked()
  301. test_examples_protocol_advanced_https_ota_example_truncated_bin()
  302. test_examples_protocol_advanced_https_ota_example_truncated_header()
  303. test_examples_protocol_advanced_https_ota_example_random()