unit_test.py 20 KB


  1. """
  2. Test script for unit test case.
  3. """
  4. import re
  5. import os
  6. import sys
  7. import time
  8. import threading
  9. # if we want to run test case outside `tiny-test-fw` folder,
  10. # we need to insert tiny-test-fw path into sys path
  11. test_fw_path = os.getenv("TEST_FW_PATH")
  12. if test_fw_path and test_fw_path not in sys.path:
  13. sys.path.insert(0, test_fw_path)
  14. import TinyFW
  15. import IDF
  16. import Utility
  17. from DUT import ExpectTimeout
  18. from IDF.IDFApp import UT
  19. UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
  20. RESET_PATTERN = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
  21. EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
  22. ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-eA-E\d]{8} on core \d)")
  23. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  24. UT_TIMEOUT = 30
  25. def format_test_case_config(test_case_data):
  26. """
  27. convert the test case data to unified format.
  28. We need to following info to run unit test cases:
  29. 1. unit test app config
  30. 2. test case name
  31. 3. test case reset info
  32. the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
  33. Each test case is a dict with "name" and "reset" as keys. For example::
  34. case_config = {
  35. "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
  36. "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
  37. }
  38. If config is not specified for test case, then
  39. :param test_case_data: string, list, or a dictionary list
  40. :return: formatted data
  41. """
  42. case_config = dict()
  43. def parse_case(one_case_data):
  44. """ parse and format one case """
  45. def process_reset_list(reset_list):
  46. # strip space and remove white space only items
  47. _output = list()
  48. for _r in reset_list:
  49. _data = _r.strip(" ")
  50. if _data:
  51. _output.append(_data)
  52. return _output
  53. _case = dict()
  54. if isinstance(one_case_data, str):
  55. _temp = one_case_data.split(" [reset=")
  56. _case["name"] = _temp[0]
  57. try:
  58. _case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
  59. except IndexError:
  60. _case["reset"] = list()
  61. elif isinstance(one_case_data, dict):
  62. _case = one_case_data.copy()
  63. assert "name" in _case
  64. if "reset" not in _case:
  65. _case["reset"] = list()
  66. else:
  67. if isinstance(_case["reset"], str):
  68. _case["reset"] = process_reset_list(_case["reset"].split(","))
  69. else:
  70. raise TypeError("Not supported type during parsing unit test case")
  71. if "config" not in _case:
  72. _case["config"] = "default"
  73. return _case
  74. if not isinstance(test_case_data, list):
  75. test_case_data = [test_case_data]
  76. for case_data in test_case_data:
  77. parsed_case = parse_case(case_data)
  78. try:
  79. case_config[parsed_case["config"]].append(parsed_case)
  80. except KeyError:
  81. case_config[parsed_case["config"]] = [parsed_case]
  82. return case_config
  83. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test",
  84. execution_time=1, env_tag="UT_T1_1")
  85. def run_unit_test_cases(env, extra_data):
  86. """
  87. extra_data can be three types of value
  88. 1. as string:
  89. 1. "case_name"
  90. 2. "case_name [reset=RESET_REASON]"
  91. 2. as dict:
  92. 1. with key like {"name": "Intr_alloc test, shared ints"}
  93. 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
  94. 3. as list of string or dict:
  95. [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
  96. :param extra_data: the case name or case list or case dictionary
  97. :return: None
  98. """
  99. case_config = format_test_case_config(extra_data)
  100. # we don't want stop on failed case (unless some special scenarios we can't handle)
  101. # this flag is used to log if any of the case failed during executing
  102. # Before exit test function this flag is used to log if the case fails
  103. failed_cases = []
  104. for ut_config in case_config:
  105. dut = env.get_dut("unit-test-app", app_path=ut_config)
  106. dut.start_app()
  107. for one_case in case_config[ut_config]:
  108. dut.reset()
  109. # esptool ``run`` cmd takes quite long time.
  110. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
  111. # this could cause checking bootup print failed.
  112. # now we input cmd `-`, and check either bootup print or test history,
  113. # to determine if DUT is ready to test.
  114. dut.write("-", flush=False)
  115. dut.expect_any(UT_APP_BOOT_UP_DONE,
  116. "0 Tests 0 Failures 0 Ignored")
  117. # run test case
  118. dut.write("\"{}\"".format(one_case["name"]))
  119. dut.expect("Running " + one_case["name"] + "...")
  120. exception_reset_list = []
  121. # we want to set this flag in callbacks (inner functions)
  122. # use list here so we can use append to set this flag
  123. test_finish = list()
  124. # expect callbacks
  125. def one_case_finish(result):
  126. """ one test finished, let expect loop break and log result """
  127. test_finish.append(True)
  128. if result:
  129. Utility.console_log("Success: " + one_case["name"], color="green")
  130. else:
  131. failed_cases.append(one_case["name"])
  132. Utility.console_log("Failed: " + one_case["name"], color="red")
  133. def handle_exception_reset(data):
  134. """
  135. just append data to exception list.
  136. exception list will be checked in ``handle_reset_finish``, once reset finished.
  137. """
  138. exception_reset_list.append(data[0])
  139. def handle_test_finish(data):
  140. """ test finished without reset """
  141. # in this scenario reset should not happen
  142. assert not exception_reset_list
  143. if int(data[1]):
  144. # case ignored
  145. Utility.console_log("Ignored: " + one_case["name"], color="orange")
  146. one_case_finish(not int(data[0]))
  147. def handle_reset_finish(data):
  148. """ reset happened and reboot finished """
  149. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  150. result = False
  151. if len(one_case["reset"]) == len(exception_reset_list):
  152. for i, exception in enumerate(exception_reset_list):
  153. if one_case["reset"][i] not in exception:
  154. break
  155. else:
  156. result = True
  157. if not result:
  158. Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
  159. .format(one_case["reset"], exception_reset_list),
  160. color="orange")
  161. one_case_finish(result)
  162. while not test_finish:
  163. try:
  164. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  165. (EXCEPTION_PATTERN, handle_exception_reset),
  166. (ABORT_PATTERN, handle_exception_reset),
  167. (FINISH_PATTERN, handle_test_finish),
  168. (UT_APP_BOOT_UP_DONE, handle_reset_finish),
  169. timeout=UT_TIMEOUT)
  170. except ExpectTimeout:
  171. Utility.console_log("Timeout in expect", color="orange")
  172. one_case_finish(False)
  173. break
  174. # raise exception if any case fails
  175. if failed_cases:
  176. Utility.console_log("Failed Cases:", color="red")
  177. for _case_name in failed_cases:
  178. Utility.console_log("\t" + _case_name, color="red")
  179. raise AssertionError("Unit Test Failed")
  180. class Handler(threading.Thread):
  181. WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
  182. SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!')
  183. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  184. def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout=30):
  185. self.dut = dut
  186. self.sent_signal_list = sent_signal_list
  187. self.lock = lock
  188. self.parent_case_name = parent_case_name
  189. self.child_case_name = ""
  190. self.child_case_index = child_case_index + 1
  191. self.finish = False
  192. self.result = False
  193. self.fail_name = None
  194. self.timeout = timeout
  195. threading.Thread.__init__(self, name="{} Handler".format(dut))
  196. def run(self):
  197. def get_child_case_name(data):
  198. self.child_case_name = data[0]
  199. time.sleep(1)
  200. self.dut.write(str(self.child_case_index))
  201. def one_device_case_finish(result):
  202. """ one test finished, let expect loop break and log result """
  203. self.finish = True
  204. self.result = result
  205. if not result:
  206. self.fail_name = self.child_case_name
  207. def device_wait_action(data):
  208. start_time = time.time()
  209. expected_signal = data[0]
  210. while 1:
  211. if time.time() > start_time + self.timeout:
  212. Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
  213. break
  214. with self.lock:
  215. if expected_signal in self.sent_signal_list:
  216. self.dut.write(" ")
  217. self.sent_signal_list.remove(expected_signal)
  218. break
  219. time.sleep(0.01)
  220. def device_send_action(data):
  221. with self.lock:
  222. self.sent_signal_list.append(data[0].encode('utf-8'))
  223. def handle_device_test_finish(data):
  224. """ test finished without reset """
  225. # in this scenario reset should not happen
  226. if int(data[1]):
  227. # case ignored
  228. Utility.console_log("Ignored: " + self.child_case_name, color="orange")
  229. one_device_case_finish(not int(data[0]))
  230. self.dut.reset()
  231. self.dut.write("-", flush=False)
  232. self.dut.expect_any(UT_APP_BOOT_UP_DONE, "0 Tests 0 Failures 0 Ignored")
  233. time.sleep(1)
  234. self.dut.write("\"{}\"".format(self.parent_case_name))
  235. self.dut.expect("Running " + self.parent_case_name + "...")
  236. while not self.finish:
  237. try:
  238. self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
  239. (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
  240. (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
  241. (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
  242. timeout=UT_TIMEOUT)
  243. except ExpectTimeout:
  244. Utility.console_log("Timeout in expect", color="orange")
  245. one_device_case_finish(False)
  246. break
  247. def get_case_info(one_case):
  248. parent_case = one_case["name"]
  249. child_case_num = one_case["child case num"]
  250. return parent_case, child_case_num
  251. def get_dut(duts, env, name, ut_config):
  252. if name in duts:
  253. dut = duts[name]
  254. else:
  255. dut = env.get_dut(name, app_path=ut_config)
  256. duts[name] = dut
  257. dut.start_app()
  258. return dut
  259. def case_run(duts, ut_config, env, one_case, failed_cases):
  260. lock = threading.RLock()
  261. threads = []
  262. send_signal_list = []
  263. failed_device = []
  264. result = True
  265. parent_case, case_num = get_case_info(one_case)
  266. for i in range(case_num):
  267. dut = get_dut(duts, env, "dut%d" % i, ut_config)
  268. threads.append(Handler(dut, send_signal_list, lock,
  269. parent_case, i))
  270. for thread in threads:
  271. thread.setDaemon(True)
  272. thread.start()
  273. for thread in threads:
  274. thread.join()
  275. result = result and thread.result
  276. if not thread.result:
  277. failed_device.append(thread.fail_name)
  278. if result:
  279. Utility.console_log("Success: " + one_case["name"], color="green")
  280. else:
  281. failed_cases.append(one_case["name"])
  282. Utility.console_log("Failed: " + one_case["name"], color="red")
  283. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="master_slave_test_case", execution_time=1,
  284. env_tag="UT_T2_1")
  285. def run_multiple_devices_cases(env, extra_data):
  286. """
  287. extra_data can be two types of value
  288. 1. as dict:
  289. e.g.
  290. {"name": "gpio master/slave test example",
  291. "child case num": 2,
  292. "config": "release",
  293. "env_tag": "UT_T2_1"}
  294. 2. as list dict:
  295. e.g.
  296. [{"name": "gpio master/slave test example1",
  297. "child case num": 2,
  298. "config": "release",
  299. "env_tag": "UT_T2_1"},
  300. {"name": "gpio master/slave test example2",
  301. "child case num": 2,
  302. "config": "release",
  303. "env_tag": "UT_T2_1"}]
  304. """
  305. failed_cases = []
  306. case_config = format_test_case_config(extra_data)
  307. DUTS = {}
  308. for ut_config in case_config:
  309. for one_case in case_config[ut_config]:
  310. case_run(DUTS, ut_config, env, one_case, failed_cases)
  311. if failed_cases:
  312. Utility.console_log("Failed Cases:", color="red")
  313. for _case_name in failed_cases:
  314. Utility.console_log("\t" + _case_name, color="red")
  315. raise AssertionError("Unit Test Failed")
  316. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test",
  317. execution_time=1, env_tag="UT_T1_1")
  318. def run_multiple_stage_cases(env, extra_data):
  319. """
  320. extra_data can be 2 types of value
  321. 1. as dict: Mandantory keys: "name" and "child case num", optional keys: "reset" and others
  322. 3. as list of string or dict:
  323. [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
  324. :param extra_data: the case name or case list or case dictionary
  325. :return: None
  326. """
  327. case_config = format_test_case_config(extra_data)
  328. # we don't want stop on failed case (unless some special scenarios we can't handle)
  329. # this flag is used to log if any of the case failed during executing
  330. # Before exit test function this flag is used to log if the case fails
  331. failed_cases = []
  332. for ut_config in case_config:
  333. dut = env.get_dut("unit-test-app", app_path=ut_config)
  334. dut.start_app()
  335. for one_case in case_config[ut_config]:
  336. dut.reset()
  337. dut.write("-", flush=False)
  338. dut.expect_any(UT_APP_BOOT_UP_DONE,
  339. "0 Tests 0 Failures 0 Ignored")
  340. exception_reset_list = []
  341. for test_stage in range(one_case["child case num"]):
  342. # select multi stage test case name
  343. dut.write("\"{}\"".format(one_case["name"]))
  344. dut.expect("Running " + one_case["name"] + "...")
  345. # select test function for current stage
  346. dut.write(str(test_stage + 1))
  347. # we want to set this flag in callbacks (inner functions)
  348. # use list here so we can use append to set this flag
  349. stage_finish = list()
  350. def last_stage():
  351. return test_stage == one_case["child case num"] - 1
  352. def check_reset():
  353. if one_case["reset"]:
  354. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  355. result = False
  356. if len(one_case["reset"]) == len(exception_reset_list):
  357. for i, exception in enumerate(exception_reset_list):
  358. if one_case["reset"][i] not in exception:
  359. break
  360. else:
  361. result = True
  362. if not result:
  363. Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
  364. .format(one_case["reset"], exception_reset_list),
  365. color="orange")
  366. else:
  367. # we allow omit reset in multi stage cases
  368. result = True
  369. return result
  370. # expect callbacks
  371. def one_case_finish(result):
  372. """ one test finished, let expect loop break and log result """
  373. # handle test finish
  374. result = result and check_reset()
  375. if result:
  376. Utility.console_log("Success: " + one_case["name"], color="green")
  377. else:
  378. failed_cases.append(one_case["name"])
  379. Utility.console_log("Failed: " + one_case["name"], color="red")
  380. stage_finish.append("break")
  381. def handle_exception_reset(data):
  382. """
  383. just append data to exception list.
  384. exception list will be checked in ``handle_reset_finish``, once reset finished.
  385. """
  386. exception_reset_list.append(data[0])
  387. def handle_test_finish(data):
  388. """ test finished without reset """
  389. # in this scenario reset should not happen
  390. if int(data[1]):
  391. # case ignored
  392. Utility.console_log("Ignored: " + one_case["name"], color="orange")
  393. # only passed in last stage will be regarded as real pass
  394. if last_stage():
  395. one_case_finish(not int(data[0]))
  396. else:
  397. Utility.console_log("test finished before enter last stage", color="orange")
  398. one_case_finish(False)
  399. def handle_next_stage(data):
  400. """ reboot finished. we goto next stage """
  401. if last_stage():
  402. # already last stage, should never goto next stage
  403. Utility.console_log("didn't finish at last stage", color="orange")
  404. one_case_finish(False)
  405. else:
  406. stage_finish.append("continue")
  407. while not stage_finish:
  408. try:
  409. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  410. (EXCEPTION_PATTERN, handle_exception_reset),
  411. (ABORT_PATTERN, handle_exception_reset),
  412. (FINISH_PATTERN, handle_test_finish),
  413. (UT_APP_BOOT_UP_DONE, handle_next_stage),
  414. timeout=UT_TIMEOUT)
  415. except ExpectTimeout:
  416. Utility.console_log("Timeout in expect", color="orange")
  417. one_case_finish(False)
  418. break
  419. if stage_finish[0] == "break":
  420. # test breaks on current stage
  421. break
  422. # raise exception if any case fails
  423. if failed_cases:
  424. Utility.console_log("Failed Cases:", color="red")
  425. for _case_name in failed_cases:
  426. Utility.console_log("\t" + _case_name, color="red")
  427. raise AssertionError("Unit Test Failed")
  428. if __name__ == '__main__':
  429. run_multiple_devices_cases(extra_data={"name": "gpio master/slave test example",
  430. "child case num": 2,
  431. "config": "release",
  432. "env_tag": "UT_T2_1"})