unit_test.py 21 KB


  1. """
  2. Test script for unit test case.
  3. """
  4. import re
  5. import os
  6. import sys
  7. import time
  8. import threading
  9. # if we want to run test case outside `tiny-test-fw` folder,
  10. # we need to insert tiny-test-fw path into sys path
  11. test_fw_path = os.getenv("TEST_FW_PATH")
  12. if test_fw_path and test_fw_path not in sys.path:
  13. sys.path.insert(0, test_fw_path)
  14. import TinyFW
  15. import IDF
  16. import Utility
  17. from DUT import ExpectTimeout
  18. from IDF.IDFApp import UT
  19. UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
  20. RESET_PATTERN = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
  21. EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
  22. ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-eA-E\d]{8} on core \d)")
  23. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  24. STARTUP_TIMEOUT=10
  25. def format_test_case_config(test_case_data):
  26. """
  27. convert the test case data to unified format.
  28. We need to following info to run unit test cases:
  29. 1. unit test app config
  30. 2. test case name
  31. 3. test case reset info
  32. the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
  33. Each test case is a dict with "name" and "reset" as keys. For example::
  34. case_config = {
  35. "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
  36. "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
  37. }
  38. If config is not specified for test case, then
  39. :param test_case_data: string, list, or a dictionary list
  40. :return: formatted data
  41. """
  42. case_config = dict()
  43. def parse_case(one_case_data):
  44. """ parse and format one case """
  45. def process_reset_list(reset_list):
  46. # strip space and remove white space only items
  47. _output = list()
  48. for _r in reset_list:
  49. _data = _r.strip(" ")
  50. if _data:
  51. _output.append(_data)
  52. return _output
  53. _case = dict()
  54. if isinstance(one_case_data, str):
  55. _temp = one_case_data.split(" [reset=")
  56. _case["name"] = _temp[0]
  57. try:
  58. _case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
  59. except IndexError:
  60. _case["reset"] = list()
  61. elif isinstance(one_case_data, dict):
  62. _case = one_case_data.copy()
  63. assert "name" in _case
  64. if "reset" not in _case:
  65. _case["reset"] = list()
  66. else:
  67. if isinstance(_case["reset"], str):
  68. _case["reset"] = process_reset_list(_case["reset"].split(","))
  69. else:
  70. raise TypeError("Not supported type during parsing unit test case")
  71. if "config" not in _case:
  72. _case["config"] = "default"
  73. return _case
  74. if not isinstance(test_case_data, list):
  75. test_case_data = [test_case_data]
  76. for case_data in test_case_data:
  77. parsed_case = parse_case(case_data)
  78. try:
  79. case_config[parsed_case["config"]].append(parsed_case)
  80. except KeyError:
  81. case_config[parsed_case["config"]] = [parsed_case]
  82. return case_config
  83. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test",
  84. execution_time=1, env_tag="UT_T1_1")
  85. def run_unit_test_cases(env, extra_data):
  86. """
  87. extra_data can be three types of value
  88. 1. as string:
  89. 1. "case_name"
  90. 2. "case_name [reset=RESET_REASON]"
  91. 2. as dict:
  92. 1. with key like {"name": "Intr_alloc test, shared ints"}
  93. 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
  94. 3. as list of string or dict:
  95. [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
  96. :param extra_data: the case name or case list or case dictionary
  97. :return: None
  98. """
  99. case_config = format_test_case_config(extra_data)
  100. # we don't want stop on failed case (unless some special scenarios we can't handle)
  101. # this flag is used to log if any of the case failed during executing
  102. # Before exit test function this flag is used to log if the case fails
  103. failed_cases = []
  104. for ut_config in case_config:
  105. Utility.console_log("Running unit test for config: " + ut_config, "O")
  106. dut = env.get_dut("unit-test-app", app_path=ut_config)
  107. dut.start_app()
  108. for one_case in case_config[ut_config]:
  109. dut.reset()
  110. # esptool ``run`` cmd takes quite long time.
  111. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
  112. # this could cause checking bootup print failed.
  113. # now we input cmd `-`, and check either bootup print or test history,
  114. # to determine if DUT is ready to test.
  115. dut.write("-", flush=False)
  116. dut.expect_any(UT_APP_BOOT_UP_DONE,
  117. "0 Tests 0 Failures 0 Ignored", timeout=STARTUP_TIMEOUT)
  118. # run test case
  119. dut.write("\"{}\"".format(one_case["name"]))
  120. dut.expect("Running " + one_case["name"] + "...")
  121. exception_reset_list = []
  122. # we want to set this flag in callbacks (inner functions)
  123. # use list here so we can use append to set this flag
  124. test_finish = list()
  125. # expect callbacks
  126. def one_case_finish(result):
  127. """ one test finished, let expect loop break and log result """
  128. test_finish.append(True)
  129. if result:
  130. Utility.console_log("Success: " + one_case["name"], color="green")
  131. else:
  132. failed_cases.append(one_case["name"])
  133. Utility.console_log("Failed: " + one_case["name"], color="red")
  134. def handle_exception_reset(data):
  135. """
  136. just append data to exception list.
  137. exception list will be checked in ``handle_reset_finish``, once reset finished.
  138. """
  139. exception_reset_list.append(data[0])
  140. def handle_test_finish(data):
  141. """ test finished without reset """
  142. # in this scenario reset should not happen
  143. assert not exception_reset_list
  144. if int(data[1]):
  145. # case ignored
  146. Utility.console_log("Ignored: " + one_case["name"], color="orange")
  147. one_case_finish(not int(data[0]))
  148. def handle_reset_finish(data):
  149. """ reset happened and reboot finished """
  150. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  151. result = False
  152. if len(one_case["reset"]) == len(exception_reset_list):
  153. for i, exception in enumerate(exception_reset_list):
  154. if one_case["reset"][i] not in exception:
  155. break
  156. else:
  157. result = True
  158. if not result:
  159. Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
  160. .format(one_case["reset"], exception_reset_list),
  161. color="orange")
  162. one_case_finish(result)
  163. while not test_finish:
  164. try:
  165. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  166. (EXCEPTION_PATTERN, handle_exception_reset),
  167. (ABORT_PATTERN, handle_exception_reset),
  168. (FINISH_PATTERN, handle_test_finish),
  169. (UT_APP_BOOT_UP_DONE, handle_reset_finish),
  170. timeout=one_case["timeout"])
  171. except ExpectTimeout:
  172. Utility.console_log("Timeout in expect", color="orange")
  173. one_case_finish(False)
  174. break
  175. # raise exception if any case fails
  176. if failed_cases:
  177. Utility.console_log("Failed Cases:", color="red")
  178. for _case_name in failed_cases:
  179. Utility.console_log("\t" + _case_name, color="red")
  180. raise AssertionError("Unit Test Failed")
  181. class Handler(threading.Thread):
  182. WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)\]!')
  183. SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[(.+)\]!')
  184. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  185. def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
  186. self.dut = dut
  187. self.sent_signal_list = sent_signal_list
  188. self.lock = lock
  189. self.parent_case_name = parent_case_name
  190. self.child_case_name = ""
  191. self.child_case_index = child_case_index + 1
  192. self.finish = False
  193. self.result = False
  194. self.fail_name = None
  195. self.timeout = timeout
  196. threading.Thread.__init__(self, name="{} Handler".format(dut))
  197. def run(self):
  198. def get_child_case_name(data):
  199. self.child_case_name = data[0]
  200. time.sleep(1)
  201. self.dut.write(str(self.child_case_index))
  202. def one_device_case_finish(result):
  203. """ one test finished, let expect loop break and log result """
  204. self.finish = True
  205. self.result = result
  206. if not result:
  207. self.fail_name = self.child_case_name
  208. def device_wait_action(data):
  209. start_time = time.time()
  210. expected_signal = data[0]
  211. while 1:
  212. if time.time() > start_time + self.timeout:
  213. Utility.console_log("Timeout in device for function: %s"%self.child_case_name, color="orange")
  214. break
  215. with self.lock:
  216. if expected_signal in self.sent_signal_list:
  217. self.dut.write(" ")
  218. self.sent_signal_list.remove(expected_signal)
  219. break
  220. time.sleep(0.01)
  221. def device_send_action(data):
  222. with self.lock:
  223. self.sent_signal_list.append(data[0].encode('utf-8'))
  224. def handle_device_test_finish(data):
  225. """ test finished without reset """
  226. # in this scenario reset should not happen
  227. if int(data[1]):
  228. # case ignored
  229. Utility.console_log("Ignored: " + self.child_case_name, color="orange")
  230. one_device_case_finish(not int(data[0]))
  231. self.dut.reset()
  232. self.dut.write("-", flush=False)
  233. self.dut.expect_any(UT_APP_BOOT_UP_DONE, "0 Tests 0 Failures 0 Ignored")
  234. time.sleep(1)
  235. self.dut.write("\"{}\"".format(self.parent_case_name))
  236. self.dut.expect("Running " + self.parent_case_name + "...")
  237. while not self.finish:
  238. try:
  239. self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), get_child_case_name),
  240. (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
  241. (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
  242. (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
  243. timeout=self.timeout)
  244. except ExpectTimeout:
  245. Utility.console_log("Timeout in expect", color="orange")
  246. one_device_case_finish(False)
  247. break
  248. def get_case_info(one_case):
  249. parent_case = one_case["name"]
  250. child_case_num = one_case["child case num"]
  251. return parent_case, child_case_num
  252. def get_dut(duts, env, name, ut_config):
  253. if name in duts:
  254. dut = duts[name]
  255. else:
  256. dut = env.get_dut(name, app_path=ut_config)
  257. duts[name] = dut
  258. dut.start_app()
  259. return dut
  260. def case_run(duts, ut_config, env, one_case, failed_cases):
  261. lock = threading.RLock()
  262. threads = []
  263. send_signal_list = []
  264. failed_device = []
  265. result = True
  266. parent_case, case_num = get_case_info(one_case)
  267. for i in range(case_num):
  268. dut = get_dut(duts, env, "dut%d" % i, ut_config)
  269. threads.append(Handler(dut, send_signal_list, lock,
  270. parent_case, i, one_case["timeout"]))
  271. for thread in threads:
  272. thread.setDaemon(True)
  273. thread.start()
  274. for thread in threads:
  275. thread.join()
  276. result = result and thread.result
  277. if not thread.result:
  278. failed_device.append(thread.fail_name)
  279. if result:
  280. Utility.console_log("Success: " + one_case["name"], color="green")
  281. else:
  282. failed_cases.append(one_case["name"])
  283. Utility.console_log("Failed: " + one_case["name"], color="red")
  284. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="master_slave_test_case", execution_time=1,
  285. env_tag="UT_T2_1")
  286. def run_multiple_devices_cases(env, extra_data):
  287. """
  288. extra_data can be two types of value
  289. 1. as dict:
  290. e.g.
  291. {"name": "gpio master/slave test example",
  292. "child case num": 2,
  293. "config": "release",
  294. "env_tag": "UT_T2_1"}
  295. 2. as list dict:
  296. e.g.
  297. [{"name": "gpio master/slave test example1",
  298. "child case num": 2,
  299. "config": "release",
  300. "env_tag": "UT_T2_1"},
  301. {"name": "gpio master/slave test example2",
  302. "child case num": 2,
  303. "config": "release",
  304. "env_tag": "UT_T2_1"}]
  305. """
  306. failed_cases = []
  307. case_config = format_test_case_config(extra_data)
  308. DUTS = {}
  309. for ut_config in case_config:
  310. Utility.console_log("Running unit test for config: " + ut_config, "O")
  311. for one_case in case_config[ut_config]:
  312. case_run(DUTS, ut_config, env, one_case, failed_cases)
  313. if failed_cases:
  314. Utility.console_log("Failed Cases:", color="red")
  315. for _case_name in failed_cases:
  316. Utility.console_log("\t" + _case_name, color="red")
  317. raise AssertionError("Unit Test Failed")
  318. @TinyFW.test_method(app=UT, dut=IDF.IDFDUT, chip="ESP32", module="unit_test",
  319. execution_time=1, env_tag="UT_T1_1")
  320. def run_multiple_stage_cases(env, extra_data):
  321. """
  322. extra_data can be 2 types of value
  323. 1. as dict: Mandantory keys: "name" and "child case num", optional keys: "reset" and others
  324. 3. as list of string or dict:
  325. [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
  326. :param extra_data: the case name or case list or case dictionary
  327. :return: None
  328. """
  329. case_config = format_test_case_config(extra_data)
  330. # we don't want stop on failed case (unless some special scenarios we can't handle)
  331. # this flag is used to log if any of the case failed during executing
  332. # Before exit test function this flag is used to log if the case fails
  333. failed_cases = []
  334. for ut_config in case_config:
  335. Utility.console_log("Running unit test for config: " + ut_config, "O")
  336. dut = env.get_dut("unit-test-app", app_path=ut_config)
  337. dut.start_app()
  338. for one_case in case_config[ut_config]:
  339. dut.reset()
  340. dut.write("-", flush=False)
  341. dut.expect_any(UT_APP_BOOT_UP_DONE,
  342. "0 Tests 0 Failures 0 Ignored")
  343. exception_reset_list = []
  344. for test_stage in range(one_case["child case num"]):
  345. # select multi stage test case name
  346. dut.write("\"{}\"".format(one_case["name"]))
  347. dut.expect("Running " + one_case["name"] + "...")
  348. # select test function for current stage
  349. dut.write(str(test_stage + 1))
  350. # we want to set this flag in callbacks (inner functions)
  351. # use list here so we can use append to set this flag
  352. stage_finish = list()
  353. def last_stage():
  354. return test_stage == one_case["child case num"] - 1
  355. def check_reset():
  356. if one_case["reset"]:
  357. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  358. result = False
  359. if len(one_case["reset"]) == len(exception_reset_list):
  360. for i, exception in enumerate(exception_reset_list):
  361. if one_case["reset"][i] not in exception:
  362. break
  363. else:
  364. result = True
  365. if not result:
  366. Utility.console_log("""Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}"""
  367. .format(one_case["reset"], exception_reset_list),
  368. color="orange")
  369. else:
  370. # we allow omit reset in multi stage cases
  371. result = True
  372. return result
  373. # expect callbacks
  374. def one_case_finish(result):
  375. """ one test finished, let expect loop break and log result """
  376. # handle test finish
  377. result = result and check_reset()
  378. if result:
  379. Utility.console_log("Success: " + one_case["name"], color="green")
  380. else:
  381. failed_cases.append(one_case["name"])
  382. Utility.console_log("Failed: " + one_case["name"], color="red")
  383. stage_finish.append("break")
  384. def handle_exception_reset(data):
  385. """
  386. just append data to exception list.
  387. exception list will be checked in ``handle_reset_finish``, once reset finished.
  388. """
  389. exception_reset_list.append(data[0])
  390. def handle_test_finish(data):
  391. """ test finished without reset """
  392. # in this scenario reset should not happen
  393. if int(data[1]):
  394. # case ignored
  395. Utility.console_log("Ignored: " + one_case["name"], color="orange")
  396. # only passed in last stage will be regarded as real pass
  397. if last_stage():
  398. one_case_finish(not int(data[0]))
  399. else:
  400. Utility.console_log("test finished before enter last stage", color="orange")
  401. one_case_finish(False)
  402. def handle_next_stage(data):
  403. """ reboot finished. we goto next stage """
  404. if last_stage():
  405. # already last stage, should never goto next stage
  406. Utility.console_log("didn't finish at last stage", color="orange")
  407. one_case_finish(False)
  408. else:
  409. stage_finish.append("continue")
  410. while not stage_finish:
  411. try:
  412. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  413. (EXCEPTION_PATTERN, handle_exception_reset),
  414. (ABORT_PATTERN, handle_exception_reset),
  415. (FINISH_PATTERN, handle_test_finish),
  416. (UT_APP_BOOT_UP_DONE, handle_next_stage),
  417. timeout=one_case["timeout"])
  418. except ExpectTimeout:
  419. Utility.console_log("Timeout in expect", color="orange")
  420. one_case_finish(False)
  421. break
  422. if stage_finish[0] == "break":
  423. # test breaks on current stage
  424. break
  425. # raise exception if any case fails
  426. if failed_cases:
  427. Utility.console_log("Failed Cases:", color="red")
  428. for _case_name in failed_cases:
  429. Utility.console_log("\t" + _case_name, color="red")
  430. raise AssertionError("Unit Test Failed")
  431. if __name__ == '__main__':
  432. run_multiple_devices_cases(extra_data={"name": "gpio master/slave test example",
  433. "child case num": 2,
  434. "config": "release",
  435. "env_tag": "UT_T2_1"})