example_test.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import re
  2. import os
  3. import socket
  4. from threading import Thread
  5. import ssl
  6. from tiny_test_fw import DUT
  7. import ttfw_idf
  8. import random
  9. import subprocess
  10. try:
  11. import BaseHTTPServer
  12. from SimpleHTTPServer import SimpleHTTPRequestHandler
  13. except ImportError:
  14. import http.server as BaseHTTPServer
  15. from http.server import SimpleHTTPRequestHandler
  16. server_cert = "-----BEGIN CERTIFICATE-----\n" \
  17. "MIIDXTCCAkWgAwIBAgIJAP4LF7E72HakMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV\n"\
  18. "BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX\n"\
  19. "aWRnaXRzIFB0eSBMdGQwHhcNMTkwNjA3MDk1OTE2WhcNMjAwNjA2MDk1OTE2WjBF\n"\
  20. "MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50\n"\
  21. "ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB\n"\
  22. "CgKCAQEAlzfCyv3mIv7TlLkObxunKfCdrJ/zgdANrsx0RBtpEPhV560hWJ0fEin0\n"\
  23. "nIOMpJSiF9E6QsPdr6Q+eogH4XnOMU9JE+iG743N1dPfGEzJvRlyct/Ck8SswKPC\n"\
  24. "9+VXsnOdZmUw9y/xtANbURA/TspvPzz3Avv382ffffrJGh7ooOmaZSCZFlSYHLZA\n"\
  25. "w/XlRr0sSRbLpFGY0gXjaAV8iHHiPDYLy4kZOepjV9U51xi+IGsL4w75zuMgsHyF\n"\
  26. "3nJeGYHgtGVBrkL0ZKG5udY0wcBjysjubDJC4iSlNiq2HD3fhs7j6CZddV2v845M\n"\
  27. "lVKNxP0kO4Uj4D8r+5USWC8JKfAwxQIDAQABo1AwTjAdBgNVHQ4EFgQU6OE7ssfY\n"\
  28. "IIPTDThiUoofUpsD5NwwHwYDVR0jBBgwFoAU6OE7ssfYIIPTDThiUoofUpsD5Nww\n"\
  29. "DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAXIlHS/FJWfmcinUAxyBd\n"\
  30. "/xd5Lu8ykeru6oaUCci+Vk9lyoMMES7lQ+b/00d5x7AcTawkTil9EWpBTPTOTraA\n"\
  31. "lzJMQhNKmSLk0iIoTtAJtSZgUSpIIozqK6lenxQQDsHbXKU6h+u9H6KZE8YcjsFl\n"\
  32. "6vL7sw9BVotw/VxfgjQ5OSGLgoLrdVT0z5C2qOuwOgz1c7jNiJhtMdwN+cOtnJp2\n"\
  33. "fuBgEYyE3eeuWogvkWoDcIA8r17Ixzkpq2oJsdvZcHZPIZShPKW2SHUsl98KDemu\n"\
  34. "y0pQyExmQUbwKE4vbFb9XuWCcL9XaOHQytyszt2DeD67AipvoBwVU7/LBOvqnsmy\n"\
  35. "hA==\n"\
  36. "-----END CERTIFICATE-----\n"
  37. server_key = "-----BEGIN PRIVATE KEY-----\n"\
  38. "MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCXN8LK/eYi/tOU\n"\
  39. "uQ5vG6cp8J2sn/OB0A2uzHREG2kQ+FXnrSFYnR8SKfScg4yklKIX0TpCw92vpD56\n"\
  40. "iAfhec4xT0kT6Ibvjc3V098YTMm9GXJy38KTxKzAo8L35Veyc51mZTD3L/G0A1tR\n"\
  41. "ED9Oym8/PPcC+/fzZ999+skaHuig6ZplIJkWVJgctkDD9eVGvSxJFsukUZjSBeNo\n"\
  42. "BXyIceI8NgvLiRk56mNX1TnXGL4gawvjDvnO4yCwfIXecl4ZgeC0ZUGuQvRkobm5\n"\
  43. "1jTBwGPKyO5sMkLiJKU2KrYcPd+GzuPoJl11Xa/zjkyVUo3E/SQ7hSPgPyv7lRJY\n"\
  44. "Lwkp8DDFAgMBAAECggEAfBhAfQE7mUByNbxgAgI5fot9eaqR1Nf+QpJ6X2H3KPwC\n"\
  45. "02sa0HOwieFwYfj6tB1doBoNq7i89mTc+QUlIn4pHgIowHO0OGawomeKz5BEhjCZ\n"\
  46. "4XeLYGSoODary2+kNkf2xY8JTfFEcyvGBpJEwc4S2VyYgRRx+IgnumTSH+N5mIKZ\n"\
  47. "SXWNdZIuHEmkwod+rPRXs6/r+PH0eVW6WfpINEbr4zVAGXJx2zXQwd2cuV1GTJWh\n"\
  48. "cPVOXLu+XJ9im9B370cYN6GqUnR3fui13urYbnWnEf3syvoH/zuZkyrVChauoFf8\n"\
  49. "8EGb74/HhXK7Q2s8NRakx2c7OxQifCbcy03liUMmyQKBgQDFAob5B/66N4Q2cq/N\n"\
  50. "MWPf98kYBYoLaeEOhEJhLQlKk0pIFCTmtpmUbpoEes2kCUbH7RwczpYko8tlKyoB\n"\
  51. "6Fn6RY4zQQ64KZJI6kQVsjkYpcP/ihnOY6rbds+3yyv+4uPX7Eh9sYZwZMggE19M\n"\
  52. "CkFHkwAjiwqhiiSlUxe20sWmowKBgQDEfx4lxuFzA1PBPeZKGVBTxYPQf+DSLCre\n"\
  53. "ZFg3ZmrxbCjRq1O7Lra4FXWD3dmRq7NDk79JofoW50yD8wD7I0B7opdDfXD2idO8\n"\
  54. "0dBnWUKDr2CAXyoLEINce9kJPbx4kFBQRN9PiGF7VkDQxeQ3kfS8CvcErpTKCOdy\n"\
  55. "5wOwBTwJdwKBgDiTFTeGeDv5nVoVbS67tDao7XKchJvqd9q3WGiXikeELJyuTDqE\n"\
  56. "zW22pTwMF+m3UEAxcxVCrhMvhkUzNAkANHaOatuFHzj7lyqhO5QPbh4J3FMR0X9X\n"\
  57. "V8VWRSg+jA/SECP9koOl6zlzd5Tee0tW1pA7QpryXscs6IEhb3ns5R2JAoGAIkzO\n"\
  58. "RmnhEOKTzDex611f2D+yMsMfy5BKK2f4vjLymBH5TiBKDXKqEpgsW0huoi8Gq9Uu\n"\
  59. "nvvXXAgkIyRYF36f0vUe0nkjLuYAQAWgC2pZYgNLJR13iVbol0xHJoXQUHtgiaJ8\n"\
  60. "GLYFzjHQPqFMpSalQe3oELko39uOC1CoJCHFySECgYBeycUnRBikCO2n8DNhY4Eg\n"\
  61. "9Y3oxcssRt6ea5BZwgW2eAYi7/XqKkmxoSoOykUt3MJx9+EkkrL17bxFSpkj1tvL\n"\
  62. "qvxn7egtsKjjgGNAxwXC4MwCvhveyUQQxtQb8AqGrGqo4jEEN0L15cnP38i2x1Uo\n"\
  63. "muhfskWf4MABV0yTUaKcGg==\n"\
  64. "-----END PRIVATE KEY-----\n"
  65. def get_my_ip():
  66. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  67. s1.connect(("8.8.8.8", 80))
  68. my_ip = s1.getsockname()[0]
  69. s1.close()
  70. return my_ip
  71. def get_server_status(host_ip, port):
  72. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  73. server_status = sock.connect_ex((host_ip, port))
  74. sock.close()
  75. if server_status == 0:
  76. return True
  77. return False
  78. def create_file(server_file, file_data):
  79. with open(server_file, "w+") as file:
  80. file.write(file_data)
  81. def get_ca_cert(ota_image_dir):
  82. os.chdir(ota_image_dir)
  83. server_file = os.path.join(ota_image_dir, "server_cert.pem")
  84. create_file(server_file, server_cert)
  85. key_file = os.path.join(ota_image_dir, "server_key.pem")
  86. create_file(key_file, server_key)
  87. return server_file, key_file
  88. def start_https_server(ota_image_dir, server_ip, server_port):
  89. server_file, key_file = get_ca_cert(ota_image_dir)
  90. httpd = BaseHTTPServer.HTTPServer((server_ip, server_port),
  91. SimpleHTTPRequestHandler)
  92. httpd.socket = ssl.wrap_socket(httpd.socket,
  93. keyfile=key_file,
  94. certfile=server_file, server_side=True)
  95. httpd.serve_forever()
  96. def start_chunked_server(ota_image_dir, server_port):
  97. server_file, key_file = get_ca_cert(ota_image_dir)
  98. chunked_server = subprocess.Popen(["openssl", "s_server", "-WWW", "-key", key_file, "-cert", server_file, "-port", str(server_port)])
  99. return chunked_server
  100. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  101. def test_examples_protocol_native_ota_example(env, extra_data):
  102. """
  103. This is a positive test case, which downloads complete binary file multiple number of times.
  104. Number of iterations can be specified in variable iterations.
  105. steps: |
  106. 1. join AP
  107. 2. Fetch OTA image over HTTPS
  108. 3. Reboot with the new OTA image
  109. """
  110. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example", dut_class=ttfw_idf.ESP32DUT)
  111. server_port = 8002
  112. # No. of times working of application to be validated
  113. iterations = 3
  114. # File to be downloaded. This file is generated after compilation
  115. bin_name = "native_ota.bin"
  116. # check and log bin size
  117. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  118. bin_size = os.path.getsize(binary_file)
  119. ttfw_idf.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  120. ttfw_idf.check_performance("native_ota_bin_size", bin_size // 1024)
  121. # start test
  122. host_ip = get_my_ip()
  123. if (get_server_status(host_ip, server_port) is False):
  124. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  125. thread1.daemon = True
  126. thread1.start()
  127. dut1.start_app()
  128. for i in range(iterations):
  129. dut1.expect("Loaded app from partition at offset", timeout=30)
  130. try:
  131. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  132. print("Connected to AP with IP: {}".format(ip_address))
  133. except DUT.ExpectTimeout:
  134. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  135. thread1.close()
  136. dut1.expect("Starting OTA example", timeout=30)
  137. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + bin_name))
  138. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + bin_name)
  139. dut1.expect("Loaded app from partition at offset", timeout=60)
  140. dut1.expect("Starting OTA example", timeout=30)
  141. dut1.reset()
  142. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  143. def test_examples_protocol_native_ota_example_truncated_bin(env, extra_data):
  144. """
  145. Working of OTA if binary file is truncated is validated in this test case.
  146. Application should return with error message in this case.
  147. steps: |
  148. 1. join AP
  149. 2. Generate truncated binary file
  150. 3. Fetch OTA image over HTTPS
  151. 4. Check working of code if bin is truncated
  152. """
  153. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example", dut_class=ttfw_idf.ESP32DUT)
  154. server_port = 8002
  155. # Original binary file generated after compilation
  156. bin_name = "native_ota.bin"
  157. # Truncated binary file to be generated from original binary file
  158. truncated_bin_name = "truncated.bin"
  159. # Size of truncated file to be grnerated. This value can range from 288 bytes (Image header size) to size of original binary file
  160. # truncated_bin_size is set to 64000 to reduce consumed by the test case
  161. truncated_bin_size = 64000
  162. # check and log bin size
  163. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  164. f = open(binary_file, "r+")
  165. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  166. fo.write(f.read(truncated_bin_size))
  167. fo.close()
  168. f.close()
  169. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  170. bin_size = os.path.getsize(binary_file)
  171. ttfw_idf.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  172. ttfw_idf.check_performance("native_ota_bin_size", bin_size // 1024)
  173. # start test
  174. host_ip = get_my_ip()
  175. if (get_server_status(host_ip, server_port) is False):
  176. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  177. thread1.daemon = True
  178. thread1.start()
  179. dut1.start_app()
  180. dut1.expect("Loaded app from partition at offset", timeout=30)
  181. try:
  182. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=60)
  183. print("Connected to AP with IP: {}".format(ip_address))
  184. except DUT.ExpectTimeout:
  185. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  186. dut1.expect("Starting OTA example", timeout=30)
  187. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  188. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  189. dut1.expect("native_ota_example: Image validation failed, image is corrupted", timeout=20)
  190. os.remove(binary_file)
  191. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  192. def test_examples_protocol_native_ota_example_truncated_header(env, extra_data):
  193. """
  194. Working of OTA if headers of binary file are truncated is vaildated in this test case.
  195. Application should return with error message in this case.
  196. steps: |
  197. 1. join AP
  198. 2. Generate binary file with truncated headers
  199. 3. Fetch OTA image over HTTPS
  200. 4. Check working of code if headers are not sent completely
  201. """
  202. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example", dut_class=ttfw_idf.ESP32DUT)
  203. server_port = 8002
  204. # Original binary file generated after compilation
  205. bin_name = "native_ota.bin"
  206. # Truncated binary file to be generated from original binary file
  207. truncated_bin_name = "truncated_header.bin"
  208. # Size of truncated file to be grnerated. This value should be less than 288 bytes (Image header size)
  209. truncated_bin_size = 180
  210. # check and log bin size
  211. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  212. f = open(binary_file, "r+")
  213. fo = open(os.path.join(dut1.app.binary_path, truncated_bin_name), "w+")
  214. fo.write(f.read(truncated_bin_size))
  215. fo.close()
  216. f.close()
  217. binary_file = os.path.join(dut1.app.binary_path, truncated_bin_name)
  218. bin_size = os.path.getsize(binary_file)
  219. ttfw_idf.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  220. ttfw_idf.check_performance("native_ota_bin_size", bin_size // 1024)
  221. # start test
  222. host_ip = get_my_ip()
  223. if (get_server_status(host_ip, server_port) is False):
  224. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  225. thread1.daemon = True
  226. thread1.start()
  227. dut1.start_app()
  228. dut1.expect("Loaded app from partition at offset", timeout=30)
  229. try:
  230. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=60)
  231. print("Connected to AP with IP: {}".format(ip_address))
  232. except DUT.ExpectTimeout:
  233. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  234. dut1.expect("Starting OTA example", timeout=30)
  235. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name))
  236. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + truncated_bin_name)
  237. dut1.expect("native_ota_example: received package is not fit len", timeout=20)
  238. os.remove(binary_file)
  239. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  240. def test_examples_protocol_native_ota_example_random(env, extra_data):
  241. """
  242. Working of OTA if random data is added in binary file are validated in this test case.
  243. Magic byte verification should fail in this case.
  244. steps: |
  245. 1. join AP
  246. 2. Generate random binary image
  247. 3. Fetch OTA image over HTTPS
  248. 4. Check working of code for random binary file
  249. """
  250. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example", dut_class=ttfw_idf.ESP32DUT)
  251. server_port = 8002
  252. # Random binary file to be generated
  253. random_bin_name = "random.bin"
  254. # Size of random binary file. 32000 is choosen, to reduce the time required to run the test-case
  255. random_bin_size = 32000
  256. # check and log bin size
  257. binary_file = os.path.join(dut1.app.binary_path, random_bin_name)
  258. fo = open(binary_file, "w+")
  259. # First byte of binary file is always set to zero. If first byte is generated randomly,
  260. # in some cases it may generate 0xE9 which will result in failure of testcase.
  261. fo.write(str(0))
  262. for i in range(random_bin_size - 1):
  263. fo.write(str(random.randrange(0,255,1)))
  264. fo.close()
  265. bin_size = os.path.getsize(binary_file)
  266. ttfw_idf.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  267. ttfw_idf.check_performance("native_ota_bin_size", bin_size // 1024)
  268. # start test
  269. host_ip = get_my_ip()
  270. if (get_server_status(host_ip, server_port) is False):
  271. thread1 = Thread(target=start_https_server, args=(dut1.app.binary_path, host_ip, server_port))
  272. thread1.daemon = True
  273. thread1.start()
  274. dut1.start_app()
  275. dut1.expect("Loaded app from partition at offset", timeout=30)
  276. try:
  277. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=60)
  278. print("Connected to AP with IP: {}".format(ip_address))
  279. except DUT.ExpectTimeout:
  280. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  281. dut1.expect("Starting OTA example", timeout=30)
  282. print("writing to device: {}".format("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name))
  283. dut1.write("https://" + host_ip + ":" + str(server_port) + "/" + random_bin_name)
  284. dut1.expect("esp_ota_ops: OTA image has invalid magic byte", timeout=20)
  285. os.remove(binary_file)
  286. @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
  287. def test_examples_protocol_native_ota_example_chunked(env, extra_data):
  288. """
  289. This is a positive test case, which downloads complete binary file multiple number of times.
  290. Number of iterations can be specified in variable iterations.
  291. steps: |
  292. 1. join AP
  293. 2. Fetch OTA image over HTTPS
  294. 3. Reboot with the new OTA image
  295. """
  296. dut1 = env.get_dut("native_ota_example", "examples/system/ota/native_ota_example", dut_class=ttfw_idf.ESP32DUT)
  297. # File to be downloaded. This file is generated after compilation
  298. bin_name = "native_ota.bin"
  299. # check and log bin size
  300. binary_file = os.path.join(dut1.app.binary_path, bin_name)
  301. bin_size = os.path.getsize(binary_file)
  302. ttfw_idf.log_performance("native_ota_bin_size", "{}KB".format(bin_size // 1024))
  303. ttfw_idf.check_performance("native_ota_bin_size", bin_size // 1024)
  304. # start test
  305. host_ip = get_my_ip()
  306. chunked_server = start_chunked_server(dut1.app.binary_path, 8070)
  307. dut1.start_app()
  308. dut1.expect("Loaded app from partition at offset", timeout=30)
  309. try:
  310. ip_address = dut1.expect(re.compile(r" sta ip: ([^,]+),"), timeout=30)
  311. print("Connected to AP with IP: {}".format(ip_address))
  312. except DUT.ExpectTimeout:
  313. raise ValueError('ENV_TEST_FAILURE: Cannot connect to AP')
  314. dut1.expect("Starting OTA example", timeout=30)
  315. print("writing to device: {}".format("https://" + host_ip + ":8070/" + bin_name))
  316. dut1.write("https://" + host_ip + ":8070/" + bin_name)
  317. dut1.expect("Loaded app from partition at offset", timeout=60)
  318. dut1.expect("Starting OTA example", timeout=30)
  319. chunked_server.kill()
  320. os.remove(os.path.join(dut1.app.binary_path, "server_cert.pem"))
  321. os.remove(os.path.join(dut1.app.binary_path, "server_key.pem"))
  322. if __name__ == '__main__':
  323. test_examples_protocol_native_ota_example()
  324. test_examples_protocol_native_ota_example_chunked()
  325. test_examples_protocol_native_ota_example_truncated_bin()
  326. test_examples_protocol_native_ota_example_truncated_header()
  327. test_examples_protocol_native_ota_example_random()