example_test.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. import re
  2. import os
  3. import sys
  4. import socket
  5. from threading import Thread
  6. import ssl
  7. try:
  8. import BaseHTTPServer
  9. from SimpleHTTPServer import SimpleHTTPRequestHandler
  10. except ImportError:
  11. import http.server as BaseHTTPServer
  12. from http.server import SimpleHTTPRequestHandler
  13. try:
  14. import IDF
  15. except ImportError:
  16. # this is a test case write with tiny-test-fw.
  17. # to run test cases outside tiny-test-fw,
  18. # we need to set environment variable `TEST_FW_PATH`,
  19. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  20. test_fw_path = os.getenv("TEST_FW_PATH")
  21. if test_fw_path and test_fw_path not in sys.path:
  22. sys.path.insert(0, test_fw_path)
  23. import IDF
  24. import DUT
  25. import random
  26. import subprocess
  27. server_cert = "-----BEGIN CERTIFICATE-----\n" \
  28. "MIIDXTCCAkWgAwIBAgIJAP4LF7E72HakMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV\n"\
  29. "BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX\n"\
  30. "aWRnaXRzIFB0eSBMdGQwHhcNMTkwNjA3MDk1OTE2WhcNMjAwNjA2MDk1OTE2WjBF\n"\
  31. "MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50\n"\
  32. "ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\n"\
  33. "CgKCAQEAlzfCyv3mIv7TlLkObxunKfCdrJ/zgdANrsx0RBtpEPhV560hWJ0fEin0\n"\
  34. "nIOMpJSiF9E6QsPdr6Q+eogH4XnOMU9JE+iG743N1dPfGEzJvRlyct/Ck8SswKPC\n"\
  35. "9+VXsnOdZmUw9y/xtANbURA/TspvPzz3Avv382ffffrJGh7ooOmaZSCZFlSYHLZA\n"\
  36. "w/XlRr0sSRbLpFGY0gXjaAV8iHHiPDYLy4kZOepjV9U51xi+IGsL4w75zuMgsHyF\n"\
  37. "3nJeGYHgtGVBrkL0ZKG5udY0wcBjysjubDJC4iSlNiq2HD3fhs7j6CZddV2v845M\n"\
  38. "lVKNxP0kO4Uj4D8r+5USWC8JKfAwxQIDAQABo1AwTjAdBgNVHQ4EFgQU6OE7ssfY\n"\
  39. "IIPTDThiUoofUpsD5NwwHwYDVR0jBBgwFoAU6OE7ssfYIIPTDThiUoofUpsD5Nww\n"\
  40. "DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAXIlHS/FJWfmcinUAxyBd\n"\
  41. "/xd5Lu8ykeru6oaUCci+Vk9lyoMMES7lQ+b/00d5x7AcTawkTil9EWpBTPTOTraA\n"\
  42. "lzJMQhNKmSLk0iIoTtAJtSZgUSpIIozqK6lenxQQDsHbXKU6h+u9H6KZE8YcjsFl\n"\
  43. "6vL7sw9BVotw/VxfgjQ5OSGLgoLrdVT0z5C2qOuwOgz1c7jNiJhtMdwN+cOtnJp2\n"\
  44. "fuBgEYyE3eeuWogvkWoDcIA8r17Ixzkpq2oJsdvZcHZPIZShPKW2SHUsl98KDemu\n"\
  45. "y0pQyExmQUbwKE4vbFb9XuWCcL9XaOHQytyszt2DeD67AipvoBwVU7/LBOvqnsmy\n"\
  46. "hA==\n"\
  47. "-----END CERTIFICATE-----\n"
  48. server_key = "-----BEGIN PRIVATE KEY-----\n"\
  49. "MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCXN8LK/eYi/tOU\n"\
  50. "uQ5vG6cp8J2sn/OB0A2uzHREG2kQ+FXnrSFYnR8SKfScg4yklKIX0TpCw92vpD56\n"\
  51. "iAfhec4xT0kT6Ibvjc3V098YTMm9GXJy38KTxKzAo8L35Veyc51mZTD3L/G0A1tR\n"\
  52. "ED9Oym8/PPcC+/fzZ999+skaHuig6ZplIJkWVJgctkDD9eVGvSxJFsukUZjSBeNo\n"\
  53. "BXyIceI8NgvLiRk56mNX1TnXGL4gawvjDvnO4yCwfIXecl4ZgeC0ZUGuQvRkobm5\n"\
  54. "1jTBwGPKyO5sMkLiJKU2KrYcPd+GzuPoJl11Xa/zjkyVUo3E/SQ7hSPgPyv7lRJY\n"\
  55. "Lwkp8DDFAgMBAAECggEAfBhAfQE7mUByNbxgAgI5fot9eaqR1Nf+QpJ6X2H3KPwC\n"\
  56. "02sa0HOwieFwYfj6tB1doBoNq7i89mTc+QUlIn4pHgIowHO0OGawomeKz5BEhjCZ\n"\
  57. "4XeLYGSoODary2+kNkf2xY8JTfFEcyvGBpJEwc4S2VyYgRRx+IgnumTSH+N5mIKZ\n"\
  58. "SXWNdZIuHEmkwod+rPRXs6/r+PH0eVW6WfpINEbr4zVAGXJx2zXQwd2cuV1GTJWh\n"\
  59. "cPVOXLu+XJ9im9B370cYN6GqUnR3fui13urYbnWnEf3syvoH/zuZkyrVChauoFf8\n"\
  60. "8EGb74/HhXK7Q2s8NRakx2c7OxQifCbcy03liUMmyQKBgQDFAob5B/66N4Q2cq/N\n"\
  61. "MWPf98kYBYoLaeEOhEJhLQlKk0pIFCTmtpmUbpoEes2kCUbH7RwczpYko8tlKyoB\n"\
  62. "6Fn6RY4zQQ64KZJI6kQVsjkYpcP/ihnOY6rbds+3yyv+4uPX7Eh9sYZwZMggE19M\n"\
  63. "CkFHkwAjiwqhiiSlUxe20sWmowKBgQDEfx4lxuFzA1PBPeZKGVBTxYPQf+DSLCre\n"\
  64. "ZFg3ZmrxbCjRq1O7Lra4FXWD3dmRq7NDk79JofoW50yD8wD7I0B7opdDfXD2idO8\n"\
  65. "0dBnWUKDr2CAXyoLEINce9kJPbx4kFBQRN9PiGF7VkDQxeQ3kfS8CvcErpTKCOdy\n"\
  66. "5wOwBTwJdwKBgDiTFTeGeDv5nVoVbS67tDao7XKchJvqd9q3WGiXikeELJyuTDqE\n"\
  67. "zW22pTwMF+m3UEAxcxVCrhMvhkUzNAkANHaOatuFHzj7lyqhO5QPbh4J3FMR0X9X\n"\
  68. "V8VWRSg+jA/SECP9koOl6zlzd5Tee0tW1pA7QpryXscs6IEhb3ns5R2JAoGAIkzO\n"\
  69. "RmnhEOKTzDex611f2D+yMsMfy5BKK2f4vjLymBH5TiBKDXKqEpgsW0huoi8Gq9Uu\n"\
  70. "nvvXXAgkIyRYF36f0vUe0nkjLuYAQAWgC2pZYgNLJR13iVbol0xHJoXQUHtgiaJ8\n"\
  71. "GLYFzjHQPqFMpSalQe3oELko39uOC1CoJCHFySECgYBeycUnRBikCO2n8DNhY4Eg\n"\
  72. "9Y3oxcssRt6ea5BZwgW2eAYi7/XqKkmxoSoOykUt3MJx9+EkkrL17bxFSpkj1tvL\n"\
  73. "qvxn7egtsKjjgGNAxwXC4MwCvhveyUQQxtQb8AqGrGqo4jEEN0L15cnP38i2x1Uo\n"\
  74. "muhfskWf4MABV0yTUaKcGg==\n"\
  75. "-----END PRIVATE KEY-----\n"
  76. def get_my_ip():
  77. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  78. s1.connect(("8.8.8.8", 80))
  79. my_ip = s1.getsockname()[0]
  80. s1.close()
  81. return my_ip
  82. def get_server_status(host_ip, port):
  83. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  84. server_status = sock.connect_ex((host_ip, port))
  85. sock.close()
  86. if server_status == 0:
  87. return True
  88. return False
  89. def create_file(server_file, file_data):
  90. with open(server_file, "w+") as file:
  91. file.write(file_data)
  92. def get_ca_cert(ota_image_dir):
  93. os.chdir(ota_image_dir)
  94. server_file = os.path.join(ota_image_dir, "server_cert.pem")
  95. create_file(server_file, server_cert)
  96. key_file = os.path.join(ota_image_dir, "server_key.pem")
  97. create_file(key_file, server_key)
  98. return server_file, key_file
  99. def start_https_server(ota_image_dir, server_ip, server_port):
  100. server_file, key_file = get_ca_cert(ota_image_dir)
  101. httpd = BaseHTTPServer.HTTPServer((server_ip, server_port),
  102. SimpleHTTPRequestHandler)
  103. httpd.socket = ssl.wrap_socket(httpd.socket,
  104. keyfile=key_file,
  105. certfile=server_file, server_side=True)
  106. httpd.serve_forever()
  107. def start_chunked_server(ota_image_dir, server_port):
  108. server_file, key_file = get_ca_cert(ota_image_dir)
  109. chunked_server = subprocess.Popen(["openssl", "s_server", "-WWW", "-key", key_file, "-cert", server_file, "-port", str(server_port)])
  110. return chunked_server
  111. @IDF.idf_example_test(env_tag="Example_WIFI")
  112. def test_examples_protocol_native_ota_example(env, extra_data):
  113. """
  114. This is a positive test case, which downloads complete binary file multiple number of times.
  115. Number of iterations can be specified in variable iterations.
  116. steps: |
  117. 1. join AP
  118. 2. Fetch OTA image over HTTPS
  119. 3. Reboot with the new OTA image
  120. """
  121. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example")
  122. server_port = 8002
  123. # No. of times working of application to be validated
  124. iterations = 3
  125. # File to be downloaded. This file is generated after compilation
  126. bin_name = "native_ota.bin"
  127. # check and log bin size
  128. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  129. bin_size = os.path.getsize(binary_file)
  130. IDF.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  131. IDF.check_performance("native_ota_bin_size", bin_size // 1024)
  132. # start test
  133. host_ip = get_my_ip()
  134. if (get_server_status(host_ip, server_port) is False):
  135. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  136. thread1.daemon = True
  137. thread1.start()
  138. dut1.start_app()
  139. for i in range(iterations):
  140. dut1.expect("Loaded app from partition at offset", timeout=30)
  141. try:
  142. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  143. print("Connected to AP with IP: {}".format(ip_address))
  144. except DUT.ExpectTimeout:
  145. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  146. thread1.close()
  147. dut1.expect("Connect to Wifi ! Start to Connect to Server....", timeout=30)
  148. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + bin_name))
  149. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + bin_name)
  150. dut1.expect("Loaded app from partition at offset", timeout=60)
  151. dut1.expect("Connect to Wifi ! Start to Connect to Server....", timeout=30)
  152. dut1.reset()
  153. @IDF.idf_example_test(env_tag="Example_WIFI")
  154. def test_examples_protocol_native_ota_example_truncated_bin(env, extra_data):
  155. """
  156. Working of OTA if binary file is truncated is validated in this test case.
  157. Application should return with error message in this case.
  158. steps: |
  159. 1. join AP
  160. 2. Generate truncated binary file
  161. 3. Fetch OTA image over HTTPS
  162. 4. Check working of code if bin is truncated
  163. """
  164. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example")
  165. server_port = 8002
  166. # Original binary file generated after compilation
  167. bin_name = "native_ota.bin"
  168. # Truncated binary file to be generated from original binary file
  169. truncated_bin_name = "truncated.bin"
  170. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  171. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  172. truncated_bin_size = 64000
  173. # check and log bin size
  174. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  175. f = open(binary_file, "r+")
  176. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  177. fo.write(f.read(truncated_bin_size))
  178. fo.close()
  179. f.close()
  180. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  181. bin_size = os.path.getsize(binary_file)
  182. IDF.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  183. IDF.check_performance("native_ota_bin_size", bin_size // 1024)
  184. # start test
  185. host_ip = get_my_ip()
  186. if (get_server_status(host_ip, server_port) is False):
  187. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  188. thread1.daemon = True
  189. thread1.start()
  190. dut1.start_app()
  191. dut1.expect("Loaded app from partition at offset", timeout=30)
  192. try:
  193. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=60)
  194. print("Connected to AP with IP: {}".format(ip_address))
  195. except DUT.ExpectTimeout:
  196. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  197. dut1.expect("Connect to Wifi ! Start to Connect to Server....", timeout=30)
  198. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  199. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  200. dut1.expect("native_ota_example: Image validation failed, image is corrupted", timeout=20)
  201. os.remove(binary_file)
  202. @IDF.idf_example_test(env_tag="Example_WIFI")
  203. def test_examples_protocol_native_ota_example_truncated_header(env, extra_data):
  204. """
  205. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  206. Application should return with error message in this case.
  207. steps: |
  208. 1. join AP
  209. 2. Generate binary file with truncated headers
  210. 3. Fetch OTA image over HTTPS
  211. 4. Check working of code if headers are not sent completely
  212. """
  213. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example")
  214. server_port = 8002
  215. # Original binary file generated after compilation
  216. bin_name = "native_ota.bin"
  217. # Truncated binary file to be generated from original binary file
  218. truncated_bin_name = "truncated_header.bin"
  219. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  220. truncated_bin_size = 180
  221. # check and log bin size
  222. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  223. f = open(binary_file, "r+")
  224. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  225. fo.write(f.read(truncated_bin_size))
  226. fo.close()
  227. f.close()
  228. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  229. bin_size = os.path.getsize(binary_file)
  230. IDF.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  231. IDF.check_performance("native_ota_bin_size", bin_size // 1024)
  232. # start test
  233. host_ip = get_my_ip()
  234. if (get_server_status(host_ip, server_port) is False):
  235. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  236. thread1.daemon = True
  237. thread1.start()
  238. dut1.start_app()
  239. dut1.expect("Loaded app from partition at offset", timeout=30)
  240. try:
  241. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=60)
  242. print("Connected to AP with IP: {}".format(ip_address))
  243. except DUT.ExpectTimeout:
  244. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  245. dut1.expect("Connect to Wifi ! Start to Connect to Server....", timeout=30)
  246. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  247. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  248. dut1.expect("native_ota_example: received package is not fit len", timeout=20)
  249. os.remove(binary_file)
  250. @IDF.idf_example_test(env_tag="Example_WIFI")
  251. def test_examples_protocol_native_ota_example_random(env, extra_data):
  252. """
  253. Working of OTA if random data is added in binary file are validated in this test case.
  254. Magic byte verification should fail in this case.
  255. steps: |
  256. 1. join AP
  257. 2. Generate random binary image
  258. 3. Fetch OTA image over HTTPS
  259. 4. Check working of code for random binary file
  260. """
  261. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example")
  262. server_port = 8002
  263. # Random binary file to be generated
  264. random_bin_name = "random.bin"
  265. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  266. random_bin_size = 32000
  267. # check and log bin size
  268. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  269. fo = open(binary_file, "w+")
  270. # First byte of binary file is always set to zero. If first byte is generated randomly,
  271. # in some cases it may generate 0xE9 which will result in failure of testcase.
  272. fo.write(str(0))
  273. for i in range(random_bin_size - 1):
  274. fo.write(str(random.randrange(0,255,1)))
  275. fo.close()
  276. bin_size = os.path.getsize(binary_file)
  277. IDF.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  278. IDF.check_performance("native_ota_bin_size", bin_size // 1024)
  279. # start test
  280. host_ip = get_my_ip()
  281. if (get_server_status(host_ip, server_port) is False):
  282. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  283. thread1.daemon = True
  284. thread1.start()
  285. dut1.start_app()
  286. dut1.expect("Loaded app from partition at offset", timeout=30)
  287. try:
  288. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=60)
  289. print("Connected to AP with IP: {}".format(ip_address))
  290. except DUT.ExpectTimeout:
  291. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  292. dut1.expect("Connect to Wifi ! Start to Connect to Server....", timeout=30)
  293. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name))
  294. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name)
  295. dut1.expect("esp_ota_ops: OTA image has invalid magic byte", timeout=20)
  296. os.remove(binary_file)
  297. @IDF.idf_example_test(env_tag="Example_WIFI")
  298. def test_examples_protocol_native_ota_example_chunked(env, extra_data):
  299. """
  300. This is a positive test case, which downloads complete binary file multiple number of times.
  301. Number of iterations can be specified in variable iterations.
  302. steps: |
  303. 1. join AP
  304. 2. Fetch OTA image over HTTPS
  305. 3. Reboot with the new OTA image
  306. """
  307. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example")
  308. # File to be downloaded. This file is generated after compilation
  309. bin_name = "native_ota.bin"
  310. # check and log bin size
  311. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  312. bin_size = os.path.getsize(binary_file)
  313. IDF.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  314. IDF.check_performance("native_ota_bin_size", bin_size // 1024)
  315. # start test
  316. host_ip = get_my_ip()
  317. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  318. dut1.start_app()
  319. dut1.expect("Loaded app from partition at offset", timeout=30)
  320. try:
  321. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  322. print("Connected to AP with IP: {}".format(ip_address))
  323. except DUT.ExpectTimeout:
  324. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  325. dut1.expect("Connect to Wifi ! Start to Connect to Server....", timeout=30)
  326. print("writing to device: {}".format("https://" + host_ip + ":8070/" + bin_name))
  327. dut1.write("https://" + host_ip + ":8070/" + bin_name)
  328. dut1.expect("Loaded app from partition at offset", timeout=60)
  329. dut1.expect("Connect to Wifi ! Start to Connect to Server....", timeout=30)
  330. chunked_server.kill()
  331. os.remove(os.path.join(dut1.app.binary_path, "server_cert.pem"))
  332. os.remove(os.path.join(dut1.app.binary_path, "server_key.pem"))
  333. if __name__ == '__main__':
  334. test_examples_protocol_native_ota_example()
  335. test_examples_protocol_native_ota_example_chunked()
  336. test_examples_protocol_native_ota_example_truncated_bin()
  337. test_examples_protocol_native_ota_example_truncated_header()
  338. test_examples_protocol_native_ota_example_random()