example_test.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. import re
  2. import os
  3. import sys
  4. import socket
  5. import BaseHTTPServer
  6. import SimpleHTTPServer
  7. from threading import Thread
  8. import ssl
  9. try:
  10. import IDF
  11. except ImportError:
  12. # this is a test case write with tiny-test-fw.
  13. # to run test cases outside tiny-test-fw,
  14. # we need to set environment variable `TEST_FW_PATH`,
  15. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  16. test_fw_path = os.getenv("TEST_FW_PATH")
  17. if test_fw_path and test_fw_path not in sys.path:
  18. sys.path.insert(0, test_fw_path)
  19. import IDF
  20. import DUT
  21. import random
  22. server_cert = "-----BEGIN CERTIFICATE-----\n" \
  23. "MIIDXTCCAkWgAwIBAgIJAP4LF7E72HakMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV\n"\
  24. "BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX\n"\
  25. "aWRnaXRzIFB0eSBMdGQwHhcNMTkwNjA3MDk1OTE2WhcNMjAwNjA2MDk1OTE2WjBF\n"\
  26. "MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50\n"\
  27. "ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\n"\
  28. "CgKCAQEAlzfCyv3mIv7TlLkObxunKfCdrJ/zgdANrsx0RBtpEPhV560hWJ0fEin0\n"\
  29. "nIOMpJSiF9E6QsPdr6Q+eogH4XnOMU9JE+iG743N1dPfGEzJvRlyct/Ck8SswKPC\n"\
  30. "9+VXsnOdZmUw9y/xtANbURA/TspvPzz3Avv382ffffrJGh7ooOmaZSCZFlSYHLZA\n"\
  31. "w/XlRr0sSRbLpFGY0gXjaAV8iHHiPDYLy4kZOepjV9U51xi+IGsL4w75zuMgsHyF\n"\
  32. "3nJeGYHgtGVBrkL0ZKG5udY0wcBjysjubDJC4iSlNiq2HD3fhs7j6CZddV2v845M\n"\
  33. "lVKNxP0kO4Uj4D8r+5USWC8JKfAwxQIDAQABo1AwTjAdBgNVHQ4EFgQU6OE7ssfY\n"\
  34. "IIPTDThiUoofUpsD5NwwHwYDVR0jBBgwFoAU6OE7ssfYIIPTDThiUoofUpsD5Nww\n"\
  35. "DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAXIlHS/FJWfmcinUAxyBd\n"\
  36. "/xd5Lu8ykeru6oaUCci+Vk9lyoMMES7lQ+b/00d5x7AcTawkTil9EWpBTPTOTraA\n"\
  37. "lzJMQhNKmSLk0iIoTtAJtSZgUSpIIozqK6lenxQQDsHbXKU6h+u9H6KZE8YcjsFl\n"\
  38. "6vL7sw9BVotw/VxfgjQ5OSGLgoLrdVT0z5C2qOuwOgz1c7jNiJhtMdwN+cOtnJp2\n"\
  39. "fuBgEYyE3eeuWogvkWoDcIA8r17Ixzkpq2oJsdvZcHZPIZShPKW2SHUsl98KDemu\n"\
  40. "y0pQyExmQUbwKE4vbFb9XuWCcL9XaOHQytyszt2DeD67AipvoBwVU7/LBOvqnsmy\n"\
  41. "hA==\n"\
  42. "-----END CERTIFICATE-----\n"
  43. server_key = "-----BEGIN PRIVATE KEY-----\n"\
  44. "MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCXN8LK/eYi/tOU\n"\
  45. "uQ5vG6cp8J2sn/OB0A2uzHREG2kQ+FXnrSFYnR8SKfScg4yklKIX0TpCw92vpD56\n"\
  46. "iAfhec4xT0kT6Ibvjc3V098YTMm9GXJy38KTxKzAo8L35Veyc51mZTD3L/G0A1tR\n"\
  47. "ED9Oym8/PPcC+/fzZ999+skaHuig6ZplIJkWVJgctkDD9eVGvSxJFsukUZjSBeNo\n"\
  48. "BXyIceI8NgvLiRk56mNX1TnXGL4gawvjDvnO4yCwfIXecl4ZgeC0ZUGuQvRkobm5\n"\
  49. "1jTBwGPKyO5sMkLiJKU2KrYcPd+GzuPoJl11Xa/zjkyVUo3E/SQ7hSPgPyv7lRJY\n"\
  50. "Lwkp8DDFAgMBAAECggEAfBhAfQE7mUByNbxgAgI5fot9eaqR1Nf+QpJ6X2H3KPwC\n"\
  51. "02sa0HOwieFwYfj6tB1doBoNq7i89mTc+QUlIn4pHgIowHO0OGawomeKz5BEhjCZ\n"\
  52. "4XeLYGSoODary2+kNkf2xY8JTfFEcyvGBpJEwc4S2VyYgRRx+IgnumTSH+N5mIKZ\n"\
  53. "SXWNdZIuHEmkwod+rPRXs6/r+PH0eVW6WfpINEbr4zVAGXJx2zXQwd2cuV1GTJWh\n"\
  54. "cPVOXLu+XJ9im9B370cYN6GqUnR3fui13urYbnWnEf3syvoH/zuZkyrVChauoFf8\n"\
  55. "8EGb74/HhXK7Q2s8NRakx2c7OxQifCbcy03liUMmyQKBgQDFAob5B/66N4Q2cq/N\n"\
  56. "MWPf98kYBYoLaeEOhEJhLQlKk0pIFCTmtpmUbpoEes2kCUbH7RwczpYko8tlKyoB\n"\
  57. "6Fn6RY4zQQ64KZJI6kQVsjkYpcP/ihnOY6rbds+3yyv+4uPX7Eh9sYZwZMggE19M\n"\
  58. "CkFHkwAjiwqhiiSlUxe20sWmowKBgQDEfx4lxuFzA1PBPeZKGVBTxYPQf+DSLCre\n"\
  59. "ZFg3ZmrxbCjRq1O7Lra4FXWD3dmRq7NDk79JofoW50yD8wD7I0B7opdDfXD2idO8\n"\
  60. "0dBnWUKDr2CAXyoLEINce9kJPbx4kFBQRN9PiGF7VkDQxeQ3kfS8CvcErpTKCOdy\n"\
  61. "5wOwBTwJdwKBgDiTFTeGeDv5nVoVbS67tDao7XKchJvqd9q3WGiXikeELJyuTDqE\n"\
  62. "zW22pTwMF+m3UEAxcxVCrhMvhkUzNAkANHaOatuFHzj7lyqhO5QPbh4J3FMR0X9X\n"\
  63. "V8VWRSg+jA/SECP9koOl6zlzd5Tee0tW1pA7QpryXscs6IEhb3ns5R2JAoGAIkzO\n"\
  64. "RmnhEOKTzDex611f2D+yMsMfy5BKK2f4vjLymBH5TiBKDXKqEpgsW0huoi8Gq9Uu\n"\
  65. "nvvXXAgkIyRYF36f0vUe0nkjLuYAQAWgC2pZYgNLJR13iVbol0xHJoXQUHtgiaJ8\n"\
  66. "GLYFzjHQPqFMpSalQe3oELko39uOC1CoJCHFySECgYBeycUnRBikCO2n8DNhY4Eg\n"\
  67. "9Y3oxcssRt6ea5BZwgW2eAYi7/XqKkmxoSoOykUt3MJx9+EkkrL17bxFSpkj1tvL\n"\
  68. "qvxn7egtsKjjgGNAxwXC4MwCvhveyUQQxtQb8AqGrGqo4jEEN0L15cnP38i2x1Uo\n"\
  69. "muhfskWf4MABV0yTUaKcGg==\n"\
  70. "-----END PRIVATE KEY-----\n"
  71. def get_my_ip():
  72. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  73. s1.connect(("8.8.8.8", 80))
  74. my_ip = s1.getsockname()[0]
  75. s1.close()
  76. return my_ip
  77. def start_https_server(ota_image_dir, server_ip, server_port):
  78. # parser = argparse.ArgumentParser()
  79. # parser.add_argument('-p', '--port', dest='port', type= int,
  80. # help= "Server Port", default= 8000)
  81. # args = parser.parse_args()
  82. os.chdir(ota_image_dir)
  83. server_file = os.path.join(ota_image_dir, "server_cert.pem")
  84. cert_file_handle = open(server_file, "w+")
  85. cert_file_handle.write(server_cert)
  86. cert_file_handle.close()
  87. key_file = os.path.join(ota_image_dir, "server_key.pem")
  88. key_file_handle = open("server_key.pem", "w+")
  89. key_file_handle.write(server_key)
  90. key_file_handle.close()
  91. httpd = BaseHTTPServer.HTTPServer((server_ip, server_port),
  92. SimpleHTTPServer.SimpleHTTPRequestHandler)
  93. httpd.socket = ssl.wrap_socket(httpd.socket,
  94. keyfile=key_file,
  95. certfile=server_file, server_side=True)
  96. httpd.serve_forever()
  97. @IDF.idf_example_test(env_tag="Example_WIFI")
  98. def test_examples_protocol_advanced_https_ota_example(env, extra_data):
  99. """
  100. This is a positive test case, which downloads complete binary file multiple number of times.
  101. Number of iterations can be specified in variable iterations.
  102. steps: |
  103. 1. join AP
  104. 2. Fetch OTA image over HTTPS
  105. 3. Reboot with the new OTA image
  106. """
  107. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota")
  108. # Number of iterations to validate OTA
  109. iterations = 3
  110. # File to be downloaded. This file is generated after compilation
  111. bin_name = "advanced_https_ota.bin"
  112. # check and log bin size
  113. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  114. bin_size = os.path.getsize(binary_file)
  115. IDF.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  116. IDF.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  117. # start test
  118. host_ip = get_my_ip()
  119. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8001))
  120. thread1.daemon = True
  121. thread1.start()
  122. dut1.start_app()
  123. for i in range(iterations):
  124. dut1.expect("Loaded app from partition at offset", timeout=30)
  125. try:
  126. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  127. print("Connected to AP with IP: {}".format(ip_address))
  128. except DUT.ExpectTimeout:
  129. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  130. thread1.close()
  131. dut1.expect("Connected to WiFi network! Attempting to connect to server...", timeout=30)
  132. print("writing to device: {}".format("https://" + host_ip + ":8001/" + bin_name))
  133. dut1.write("https://" + host_ip + ":8001/" + bin_name)
  134. dut1.expect("Loaded app from partition at offset", timeout=60)
  135. dut1.expect("Connected to WiFi network! Attempting to connect to server...", timeout=30)
  136. dut1.reset()
  137. @IDF.idf_example_test(env_tag="Example_WIFI")
  138. def test_examples_protocol_advanced_https_ota_example_truncated_bin(env, extra_data):
  139. """
  140. Working of OTA if binary file is truncated is validated in this test case.
  141. Application should return with error message in this case.
  142. steps: |
  143. 1. join AP
  144. 2. Generate truncated binary file
  145. 3. Fetch OTA image over HTTPS
  146. 4. Check working of code if bin is truncated
  147. """
  148. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota")
  149. # Original binary file generated after compilation
  150. bin_name = "advanced_https_ota.bin"
  151. # Truncated binary file to be generated from original binary file
  152. truncated_bin_name = "truncated.bin"
  153. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  154. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  155. truncated_bin_size = 64000
  156. # check and log bin size
  157. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  158. f = open(binary_file, "r+")
  159. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  160. fo.write(f.read(truncated_bin_size))
  161. fo.close()
  162. f.close()
  163. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  164. bin_size = os.path.getsize(binary_file)
  165. IDF.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  166. IDF.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  167. # start test
  168. host_ip = get_my_ip()
  169. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8002))
  170. thread1.daemon = True
  171. thread1.start()
  172. dut1.start_app()
  173. dut1.expect("Loaded app from partition at offset", timeout=30)
  174. try:
  175. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  176. print("Connected to AP with IP: {}".format(ip_address))
  177. except DUT.ExpectTimeout:
  178. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  179. dut1.expect("Connected to WiFi network! Attempting to connect to server...", timeout=30)
  180. print("writing to device: {}".format("https://" + host_ip + ":8002/" + truncated_bin_name))
  181. dut1.write("https://" + host_ip + ":8002/" + truncated_bin_name)
  182. dut1.expect("Image validation failed, image is corrupted", timeout=30)
  183. @IDF.idf_example_test(env_tag="Example_WIFI")
  184. def test_examples_protocol_advanced_https_ota_example_truncated_header(env, extra_data):
  185. """
  186. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  187. Application should return with error message in this case.
  188. steps: |
  189. 1. join AP
  190. 2. Generate binary file with truncated headers
  191. 3. Fetch OTA image over HTTPS
  192. 4. Check working of code if headers are not sent completely
  193. """
  194. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota")
  195. # Original binary file generated after compilation
  196. bin_name = "advanced_https_ota.bin"
  197. # Truncated binary file to be generated from original binary file
  198. truncated_bin_name = "truncated_header.bin"
  199. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  200. truncated_bin_size = 180
  201. # check and log bin size
  202. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  203. f = open(binary_file, "r+")
  204. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  205. fo.write(f.read(truncated_bin_size))
  206. fo.close()
  207. f.close()
  208. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  209. bin_size = os.path.getsize(binary_file)
  210. IDF.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  211. IDF.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  212. # start test
  213. host_ip = get_my_ip()
  214. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8003))
  215. thread1.daemon = True
  216. thread1.start()
  217. dut1.start_app()
  218. dut1.expect("Loaded app from partition at offset", timeout=30)
  219. try:
  220. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  221. print("Connected to AP with IP: {}".format(ip_address))
  222. except DUT.ExpectTimeout:
  223. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  224. dut1.expect("Connected to WiFi network! Attempting to connect to server...", timeout=30)
  225. print("writing to device: {}".format("https://" + host_ip + ":8003/" + truncated_bin_name))
  226. dut1.write("https://" + host_ip + ":8003/" + truncated_bin_name)
  227. dut1.expect("advanced_https_ota_example: esp_https_ota_read_img_desc failed", timeout=30)
  228. @IDF.idf_example_test(env_tag="Example_WIFI")
  229. def test_examples_protocol_advanced_https_ota_example_random(env, extra_data):
  230. """
  231. Working of OTA if random data is added in binary file are validated in this test case.
  232. Magic byte verification should fail in this case.
  233. steps: |
  234. 1. join AP
  235. 2. Generate random binary image
  236. 3. Fetch OTA image over HTTPS
  237. 4. Check working of code for random binary file
  238. """
  239. dut1 = env.get_dut("advanced_https_ota_example", "examples/system/ota/advanced_https_ota")
  240. # Random binary file to be generated
  241. random_bin_name = "random.bin"
  242. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  243. random_bin_size = 32000
  244. # check and log bin size
  245. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  246. fo = open(binary_file, "w+")
  247. for i in range(random_bin_size):
  248. fo.write(str(random.randrange(0,255,1)))
  249. fo.close()
  250. bin_size = os.path.getsize(binary_file)
  251. IDF.log_performance("advanced_https_ota_bin_size", "{}KB".format(bin_size // 1024))
  252. IDF.check_performance("advanced_https_ota_bin_size", bin_size // 1024)
  253. # start test
  254. host_ip = get_my_ip()
  255. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, 8004))
  256. thread1.daemon = True
  257. thread1.start()
  258. dut1.start_app()
  259. dut1.expect("Loaded app from partition at offset", timeout=30)
  260. try:
  261. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  262. print("Connected to AP with IP: {}".format(ip_address))
  263. except DUT.ExpectTimeout:
  264. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  265. dut1.expect("Connected to WiFi network! Attempting to connect to server...", timeout=30)
  266. print("writing to device: {}".format("https://" + host_ip + ":8004/" + random_bin_name))
  267. dut1.write("https://" + host_ip + ":8004/" + random_bin_name)
  268. dut1.expect("esp_ota_ops: OTA image has invalid magic byte", timeout=10)
  269. if __name__ == '__main__':
  270. test_examples_protocol_advanced_https_ota_example()
  271. test_examples_protocol_advanced_https_ota_example_truncated_bin()
  272. test_examples_protocol_advanced_https_ota_example_truncated_header()
  273. test_examples_protocol_advanced_https_ota_example_random()