unit_test.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. """
  2. Test script for unit test case.
  3. """
  4. import re
  5. import os
  6. import sys
  7. import time
  8. import threading
  9. # if we want to run test case outside `tiny-test-fw` folder,
  10. # we need to insert tiny-test-fw path into sys path
  11. test_fw_path = os.getenv("TEST_FW_PATH")
  12. if test_fw_path and test_fw_path not in sys.path:
  13. sys.path.insert(0, test_fw_path)
  14. import TinyFW
  15. import IDF
  16. import Utility
  17. from DUT import ExpectTimeout
  18. from IDF.IDFApp import UT
  19. UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
  20. RESET_PATTERN = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
  21. EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
  22. ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-eA-E\d]{8} on core \d)")
  23. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  24. STARTUP_TIMEOUT=10
  25. def format_test_case_config(test_case_data):
  26. """
  27. convert the test case data to unified format.
  28. We need to following info to run unit test cases:
  29. 1. unit test app config
  30. 2. test case name
  31. 3. test case reset info
  32. the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
  33. Each test case is a dict with "name" and "reset" as keys. For example::
  34. case_config = {
  35. "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
  36. "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
  37. }
  38. If config is not specified for test case, then
  39. :param test_case_data: string, list, or a dictionary list
  40. :return: formatted data
  41. """
  42. case_config = dict()
  43. def parse_case(one_case_data):
  44. """ parse and format one case """
  45. def process_reset_list(reset_list):
  46. # strip space and remove white space only items
  47. _output = list()
  48. for _r in reset_list:
  49. _data = _r.strip(" ")
  50. if _data:
  51. _output.append(_data)
  52. return _output
  53. _case = dict()
  54. if isinstance(one_case_data, str):
  55. _temp = one_case_data.split(" [reset=")
  56. _case["name"] = _temp[0]
  57. try:
  58. _case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
  59. except IndexError:
  60. _case["reset"] = list()
  61. elif isinstance(one_case_data, dict):
  62. _case = one_case_data.copy()
  63. assert "name" in _case
  64. if "reset" not in _case:
  65. _case["reset"] = list()
  66. else:
  67. if isinstance(_case["reset"], str):
  68. _case["reset"] = process_reset_list(_case["reset"].split(","))
  69. else:
  70. raise TypeError("Not supported type during parsing unit test case")
  71. if "config" not in _case:
  72. _case["config"] = "default"
  73. return _case
  74. if not isinstance(test_case_data, list):
  75. test_case_data = [test_case_data]
  76. for case_data in test_case_data:
  77. parsed_case = parse_case(case_data)
  78. try:
  79. case_config[parsed_case["config"]].append(parsed_case)
  80. except KeyError:
  81. case_config[parsed_case["config"]] = [parsed_case]
  82. return case_config
  83. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test",
  84. execution_time=1, env_tag="UT_T1_1")
  85. def run_unit_test_cases(env, extra_data):
  86. """
  87. extra_data can be three types of value
  88. 1. as string:
  89. 1. "case_name"
  90. 2. "case_name [reset=RESET_REASON]"
  91. 2. as dict:
  92. 1. with key like {"name": "Intr_alloc test, shared ints"}
  93. 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
  94. 3. as list of string or dict:
  95. [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
  96. :param extra_data: the case name or case list or case dictionary
  97. :return: None
  98. """
  99. case_config = format_test_case_config(extra_data)
  100. # we don't want stop on failed case (unless some special scenarios we can't handle)
  101. # this flag is used to log if any of the case failed during executing
  102. # Before exit test function this flag is used to log if the case fails
  103. failed_cases = []
  104. for ut_config in case_config:
  105. Utility.console_log("Running unit test for config: " + ut_config, "O")
  106. dut = env.get_dut("unit-test-app", app_path=ut_config)
  107. dut.start_app()
  108. for one_case in case_config[ut_config]:
  109. dut.reset()
  110. # esptool ``run`` cmd takes quite long time.
  111. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
  112. # this could cause checking bootup print failed.
  113. # now we input cmd `-`, and check either bootup print or test history,
  114. # to determine if DUT is ready to test.
  115. dut.write("-", flush=False)
  116. dut.expect_any(UT_APP_BOOT_UP_DONE,
  117. "0 Tests 0 Failures 0 Ignored", timeout=STARTUP_TIMEOUT)
  118. # run test case
  119. dut.write("\"{}\"".format(one_case["name"]))
  120. dut.expect("Running " + one_case["name"] + "...")
  121. exception_reset_list = []
  122. # we want to set this flag in callbacks (inner functions)
  123. # use list here so we can use append to set this flag
  124. test_finish = list()
  125. # expect callbacks
  126. def one_case_finish(result):
  127. """ one test finished, let expect loop break and log result """
  128. test_finish.append(True)
  129. if result:
  130. Utility.console_log("Success: " + one_case["name"], color="green")
  131. else:
  132. failed_cases.append(one_case["name"])
  133. Utility.console_log("Failed: " + one_case["name"], color="red")
  134. def handle_exception_reset(data):
  135. """
  136. just append data to exception list.
  137. exception list will be checked in ``handle_reset_finish``, once reset finished.
  138. """
  139. exception_reset_list.append(data[0])
  140. def handle_test_finish(data):
  141. """ test finished without reset """
  142. # in this scenario reset should not happen
  143. assert not exception_reset_list
  144. if int(data[1]):
  145. # case ignored
  146. Utility.console_log("Ignored: " + one_case["name"], color="orange")
  147. one_case_finish(not int(data[0]))
  148. def handle_reset_finish(data):
  149. """ reset happened and reboot finished """
  150. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  151. result = False
  152. if len(one_case["reset"]) == len(exception_reset_list):
  153. for i, exception in enumerate(exception_reset_list):
  154. if one_case["reset"][i] not in exception:
  155. break
  156. else:
  157. result = True
  158. if not result:
  159. Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
  160. .format(one_case["reset"], exception_reset_list),
  161. color="orange")
  162. one_case_finish(result)
  163. while not test_finish:
  164. try:
  165. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  166. (EXCEPTION_PATTERN, handle_exception_reset),
  167. (ABORT_PATTERN, handle_exception_reset),
  168. (FINISH_PATTERN, handle_test_finish),
  169. (UT_APP_BOOT_UP_DONE, handle_reset_finish),
  170. timeout=one_case["timeout"])
  171. except ExpectTimeout:
  172. Utility.console_log("Timeout in expect", color="orange")
  173. one_case_finish(False)
  174. break
  175. # raise exception if any case fails
  176. if failed_cases:
  177. Utility.console_log("Failed Cases:", color="red")
  178. for _case_name in failed_cases:
  179. Utility.console_log("\t" + _case_name, color="red")
  180. raise AssertionError("Unit Test Failed")
  181. class Handler(threading.Thread):
  182. WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
  183. SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!')
  184. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  185. def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
  186. self.dut = dut
  187. self.sent_signal_list = sent_signal_list
  188. self.lock = lock
  189. self.parent_case_name = parent_case_name
  190. self.child_case_name = ""
  191. self.child_case_index = child_case_index + 1
  192. self.finish = False
  193. self.result = False
  194. self.fail_name = None
  195. self.timeout = timeout
  196. threading.Thread.__init__(self, name="{} Handler".format(dut))
  197. def run(self):
  198. def get_child_case_name(data):
  199. self.child_case_name = data[0]
  200. time.sleep(1)
  201. self.dut.write(str(self.child_case_index))
  202. def one_device_case_finish(result):
  203. """ one test finished, let expect loop break and log result """
  204. self.finish = True
  205. self.result = result
  206. if not result:
  207. self.fail_name = self.child_case_name
  208. def device_wait_action(data):
  209. start_time = time.time()
  210. expected_signal = data[0]
  211. while not THREAD_TERMINATE_FLAG:
  212. if time.time() > start_time + self.timeout:
  213. Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
  214. break
  215. with self.lock:
  216. if expected_signal in self.sent_signal_list:
  217. self.dut.write(" ")
  218. self.sent_signal_list.remove(expected_signal)
  219. break
  220. time.sleep(0.01)
  221. def device_send_action(data):
  222. with self.lock:
  223. self.sent_signal_list.append(data[0].encode('utf-8'))
  224. def handle_device_test_finish(data):
  225. """ test finished without reset """
  226. # in this scenario reset should not happen
  227. if int(data[1]):
  228. # case ignored
  229. Utility.console_log("Ignored: " + self.child_case_name, color="orange")
  230. one_device_case_finish(not int(data[0]))
  231. self.dut.reset()
  232. self.dut.write("-", flush=False)
  233. try:
  234. self.dut.expect_any(UT_APP_BOOT_UP_DONE, "0 Tests 0 Failures 0 Ignored")
  235. time.sleep(1)
  236. self.dut.write("\"{}\"".format(self.parent_case_name))
  237. self.dut.expect("Running " + self.parent_case_name + "...")
  238. except ExpectTimeout:
  239. Utility.console_log("No case detected!", color="orange")
  240. THREAD_TERMINATE_FLAG = True
  241. while not self.finish and not THREAD_TERMINATE_FLAG:
  242. try:
  243. self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
  244. (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
  245. (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
  246. (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
  247. timeout=self.timeout)
  248. except ExpectTimeout:
  249. Utility.console_log("Timeout in expect", color="orange")
  250. one_device_case_finish(False)
  251. break
  252. def get_case_info(one_case):
  253. parent_case = one_case["name"]
  254. child_case_num = one_case["child case num"]
  255. return parent_case, child_case_num
  256. def get_dut(duts, env, name, ut_config):
  257. if name in duts:
  258. dut = duts[name]
  259. else:
  260. dut = env.get_dut(name, app_path=ut_config)
  261. duts[name] = dut
  262. dut.start_app()
  263. return dut
  264. def case_run(duts, ut_config, env, one_case, failed_cases):
  265. lock = threading.RLock()
  266. threads = []
  267. send_signal_list = []
  268. result = True
  269. parent_case, case_num = get_case_info(one_case)
  270. global THREAD_TERMINATE_FLAG
  271. THREAD_TERMINATE_FLAG = False
  272. for i in range(case_num):
  273. dut = get_dut(duts, env, "dut%d" % i, ut_config)
  274. threads.append(Handler(dut, send_signal_list, lock,
  275. parent_case, i, one_case["timeout"]))
  276. for thread in threads:
  277. thread.setDaemon(True)
  278. thread.start()
  279. for thread in threads:
  280. thread.join()
  281. result = result and thread.result
  282. if not thread.result:
  283. THREAD_TERMINATE_FLAG = True
  284. if result:
  285. Utility.console_log("Success: " + one_case["name"], color="green")
  286. else:
  287. failed_cases.append(one_case["name"])
  288. Utility.console_log("Failed: " + one_case["name"], color="red")
  289. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="master_slave_test_case", execution_time=1,
  290. env_tag="UT_T2_1")
  291. def run_multiple_devices_cases(env, extra_data):
  292. """
  293. extra_data can be two types of value
  294. 1. as dict:
  295. e.g.
  296. {"name": "gpio master/slave test example",
  297. "child case num": 2,
  298. "config": "release",
  299. "env_tag": "UT_T2_1"}
  300. 2. as list dict:
  301. e.g.
  302. [{"name": "gpio master/slave test example1",
  303. "child case num": 2,
  304. "config": "release",
  305. "env_tag": "UT_T2_1"},
  306. {"name": "gpio master/slave test example2",
  307. "child case num": 2,
  308. "config": "release",
  309. "env_tag": "UT_T2_1"}]
  310. """
  311. failed_cases = []
  312. case_config = format_test_case_config(extra_data)
  313. DUTS = {}
  314. for ut_config in case_config:
  315. Utility.console_log("Running unit test for config: " + ut_config, "O")
  316. for one_case in case_config[ut_config]:
  317. case_run(DUTS, ut_config, env, one_case, failed_cases)
  318. if failed_cases:
  319. Utility.console_log("Failed Cases:", color="red")
  320. for _case_name in failed_cases:
  321. Utility.console_log("\t" + _case_name, color="red")
  322. raise AssertionError("Unit Test Failed")
  323. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test",
  324. execution_time=1, env_tag="UT_T1_1")
  325. def run_multiple_stage_cases(env, extra_data):
  326. """
  327. extra_data can be 2 types of value
  328. 1. as dict: Mandantory keys: "name" and "child case num", optional keys: "reset" and others
  329. 3. as list of string or dict:
  330. [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
  331. :param extra_data: the case name or case list or case dictionary
  332. :return: None
  333. """
  334. case_config = format_test_case_config(extra_data)
  335. # we don't want stop on failed case (unless some special scenarios we can't handle)
  336. # this flag is used to log if any of the case failed during executing
  337. # Before exit test function this flag is used to log if the case fails
  338. failed_cases = []
  339. for ut_config in case_config:
  340. Utility.console_log("Running unit test for config: " + ut_config, "O")
  341. dut = env.get_dut("unit-test-app", app_path=ut_config)
  342. dut.start_app()
  343. for one_case in case_config[ut_config]:
  344. dut.reset()
  345. dut.write("-", flush=False)
  346. dut.expect_any(UT_APP_BOOT_UP_DONE,
  347. "0 Tests 0 Failures 0 Ignored")
  348. exception_reset_list = []
  349. for test_stage in range(one_case["child case num"]):
  350. # select multi stage test case name
  351. dut.write("\"{}\"".format(one_case["name"]))
  352. dut.expect("Running " + one_case["name"] + "...")
  353. # select test function for current stage
  354. dut.write(str(test_stage + 1))
  355. # we want to set this flag in callbacks (inner functions)
  356. # use list here so we can use append to set this flag
  357. stage_finish = list()
  358. def last_stage():
  359. return test_stage == one_case["child case num"] - 1
  360. def check_reset():
  361. if one_case["reset"]:
  362. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  363. result = False
  364. if len(one_case["reset"]) == len(exception_reset_list):
  365. for i, exception in enumerate(exception_reset_list):
  366. if one_case["reset"][i] not in exception:
  367. break
  368. else:
  369. result = True
  370. if not result:
  371. Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
  372. .format(one_case["reset"], exception_reset_list),
  373. color="orange")
  374. else:
  375. # we allow omit reset in multi stage cases
  376. result = True
  377. return result
  378. # expect callbacks
  379. def one_case_finish(result):
  380. """ one test finished, let expect loop break and log result """
  381. # handle test finish
  382. result = result and check_reset()
  383. if result:
  384. Utility.console_log("Success: " + one_case["name"], color="green")
  385. else:
  386. failed_cases.append(one_case["name"])
  387. Utility.console_log("Failed: " + one_case["name"], color="red")
  388. stage_finish.append("break")
  389. def handle_exception_reset(data):
  390. """
  391. just append data to exception list.
  392. exception list will be checked in ``handle_reset_finish``, once reset finished.
  393. """
  394. exception_reset_list.append(data[0])
  395. def handle_test_finish(data):
  396. """ test finished without reset """
  397. # in this scenario reset should not happen
  398. if int(data[1]):
  399. # case ignored
  400. Utility.console_log("Ignored: " + one_case["name"], color="orange")
  401. # only passed in last stage will be regarded as real pass
  402. if last_stage():
  403. one_case_finish(not int(data[0]))
  404. else:
  405. Utility.console_log("test finished before enter last stage", color="orange")
  406. one_case_finish(False)
  407. def handle_next_stage(data):
  408. """ reboot finished. we goto next stage """
  409. if last_stage():
  410. # already last stage, should never goto next stage
  411. Utility.console_log("didn't finish at last stage", color="orange")
  412. one_case_finish(False)
  413. else:
  414. stage_finish.append("continue")
  415. while not stage_finish:
  416. try:
  417. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  418. (EXCEPTION_PATTERN, handle_exception_reset),
  419. (ABORT_PATTERN, handle_exception_reset),
  420. (FINISH_PATTERN, handle_test_finish),
  421. (UT_APP_BOOT_UP_DONE, handle_next_stage),
  422. timeout=one_case["timeout"])
  423. except ExpectTimeout:
  424. Utility.console_log("Timeout in expect", color="orange")
  425. one_case_finish(False)
  426. break
  427. if stage_finish[0] == "break":
  428. # test breaks on current stage
  429. break
  430. # raise exception if any case fails
  431. if failed_cases:
  432. Utility.console_log("Failed Cases:", color="red")
  433. for _case_name in failed_cases:
  434. Utility.console_log("\t" + _case_name, color="red")
  435. raise AssertionError("Unit Test Failed")
  436. if __name__ == '__main__':
  437. run_multiple_devices_cases(extra_data={"name": "gpio master/slave test example",
  438. "child case num": 2,
  439. "config": "release",
  440. "env_tag": "UT_T2_1"})