unit_test.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """
  17. Test script for unit test case.
  18. """
  19. import re
  20. import time
  21. import argparse
  22. import threading
  23. from tiny_test_fw import TinyFW, Utility, Env, DUT
  24. import ttfw_idf
  25. UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
  26. STRIP_CONFIG_PATTERN = re.compile(r"(.+?)(_\d+)?$")
  27. # matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)"
  28. RESET_PATTERN = re.compile(r"(rst:0x[0-9a-fA-F]*\s\([\w].*?\),boot:0x[0-9a-fA-F]*\s\([\w].*?\))")
  29. EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
  30. ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)")
  31. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  32. END_LIST_STR = r'\r?\nEnter test for running'
  33. TEST_PATTERN = re.compile(r'\((\d+)\)\s+"([^"]+)" ([^\r\n]+)\r?\n(' + END_LIST_STR + r')?')
  34. TEST_SUBMENU_PATTERN = re.compile(r'\s+\((\d+)\)\s+"[^"]+"\r?\n(?=(?=\()|(' + END_LIST_STR + r'))')
  35. UT_APP_PATH = "tools/unit-test-app"
  36. SIMPLE_TEST_ID = 0
  37. MULTI_STAGE_ID = 1
  38. MULTI_DEVICE_ID = 2
  39. DEFAULT_TIMEOUT = 20
  40. DUT_DELAY_AFTER_RESET = 2
  41. DUT_STARTUP_CHECK_RETRY_COUNT = 5
  42. TEST_HISTORY_CHECK_TIMEOUT = 2
  43. def reset_reason_matches(reported_str, expected_str):
  44. known_aliases = {
  45. "_RESET": "_RST",
  46. "POWERON_RESET": "POWERON",
  47. "DEEPSLEEP_RESET": "DSLEEP",
  48. }
  49. if expected_str in reported_str:
  50. return True
  51. for token, alias in known_aliases.items():
  52. if token in expected_str:
  53. alt_expected_str = expected_str.replace(token, alias)
  54. if alt_expected_str in reported_str:
  55. return True
  56. return False
  57. class TestCaseFailed(AssertionError):
  58. pass
  59. def format_test_case_config(test_case_data):
  60. """
  61. convert the test case data to unified format.
  62. We need to following info to run unit test cases:
  63. 1. unit test app config
  64. 2. test case name
  65. 3. test case reset info
  66. the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
  67. Each test case is a dict with "name" and "reset" as keys. For example::
  68. case_config = {
  69. "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
  70. "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
  71. }
  72. If config is not specified for test case, then
  73. :param test_case_data: string, list, or a dictionary list
  74. :return: formatted data
  75. """
  76. case_config = dict()
  77. def parse_case(one_case_data):
  78. """ parse and format one case """
  79. def process_reset_list(reset_list):
  80. # strip space and remove white space only items
  81. _output = list()
  82. for _r in reset_list:
  83. _data = _r.strip(" ")
  84. if _data:
  85. _output.append(_data)
  86. return _output
  87. _case = dict()
  88. if isinstance(one_case_data, str):
  89. _temp = one_case_data.split(" [reset=")
  90. _case["name"] = _temp[0]
  91. try:
  92. _case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
  93. except IndexError:
  94. _case["reset"] = list()
  95. elif isinstance(one_case_data, dict):
  96. _case = one_case_data.copy()
  97. assert "name" in _case
  98. if "reset" not in _case:
  99. _case["reset"] = list()
  100. else:
  101. if isinstance(_case["reset"], str):
  102. _case["reset"] = process_reset_list(_case["reset"].split(","))
  103. else:
  104. raise TypeError("Not supported type during parsing unit test case")
  105. if "config" not in _case:
  106. _case["config"] = "default"
  107. return _case
  108. if not isinstance(test_case_data, list):
  109. test_case_data = [test_case_data]
  110. for case_data in test_case_data:
  111. parsed_case = parse_case(case_data)
  112. try:
  113. case_config[parsed_case["config"]].append(parsed_case)
  114. except KeyError:
  115. case_config[parsed_case["config"]] = [parsed_case]
  116. return case_config
  117. def replace_app_bin(dut, name, new_app_bin):
  118. if new_app_bin is None:
  119. return
  120. search_pattern = '/{}.bin'.format(name)
  121. for i, config in enumerate(dut.download_config):
  122. if config.endswith(search_pattern):
  123. dut.download_config[i] = new_app_bin
  124. Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O")
  125. break
  126. def format_case_name(case):
  127. # we could split cases of same config into multiple binaries as we have limited rom space
  128. # we should regard those configs like `default` and `default_2` as the same config
  129. match = STRIP_CONFIG_PATTERN.match(case["config"])
  130. stripped_config_name = match.group(1)
  131. return "[{}] {}".format(stripped_config_name, case["name"])
  132. def reset_dut(dut):
  133. dut.reset()
  134. # esptool ``run`` cmd takes quite long time.
  135. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
  136. # this could cause checking bootup print failed.
  137. # now use input cmd `-` and check test history to check if DUT is bootup.
  138. # we'll retry this step for a few times,
  139. # in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
  140. #
  141. # during bootup, DUT might only receive part of the first `-` command.
  142. # If it only receive `\n`, then it will print all cases. It could take more than 5 seconds, reset check will fail.
  143. # To solve this problem, we will add a delay between reset and input `-` command. And we'll also enlarge expect timeout.
  144. time.sleep(DUT_DELAY_AFTER_RESET)
  145. for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
  146. dut.write("-")
  147. try:
  148. dut.expect("0 Tests 0 Failures 0 Ignored", timeout=TEST_HISTORY_CHECK_TIMEOUT)
  149. break
  150. except DUT.ExpectTimeout:
  151. pass
  152. else:
  153. raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
  154. def log_test_case(description, test_case, ut_config):
  155. Utility.console_log("Running {} '{}' (config {})".format(description, test_case["name"], ut_config), color="orange")
  156. Utility.console_log("Tags: %s" % ", ".join("%s=%s" % (k, v) for (k, v) in test_case.items() if k != "name" and v is not None), color="orange")
  157. def run_one_normal_case(dut, one_case, junit_test_case):
  158. reset_dut(dut)
  159. dut.start_capture_raw_data()
  160. # run test case
  161. dut.write("\"{}\"".format(one_case["name"]))
  162. dut.expect("Running " + one_case["name"] + "...")
  163. exception_reset_list = []
  164. # we want to set this flag in callbacks (inner functions)
  165. # use list here so we can use append to set this flag
  166. test_finish = list()
  167. # expect callbacks
  168. def one_case_finish(result):
  169. """ one test finished, let expect loop break and log result """
  170. test_finish.append(True)
  171. output = dut.stop_capture_raw_data()
  172. if result:
  173. Utility.console_log("Success: " + format_case_name(one_case), color="green")
  174. else:
  175. Utility.console_log("Failed: " + format_case_name(one_case), color="red")
  176. junit_test_case.add_failure_info(output)
  177. raise TestCaseFailed()
  178. def handle_exception_reset(data):
  179. """
  180. just append data to exception list.
  181. exception list will be checked in ``handle_reset_finish``, once reset finished.
  182. """
  183. exception_reset_list.append(data[0])
  184. def handle_test_finish(data):
  185. """ test finished without reset """
  186. # in this scenario reset should not happen
  187. assert not exception_reset_list
  188. if int(data[1]):
  189. # case ignored
  190. Utility.console_log("Ignored: " + format_case_name(one_case), color="orange")
  191. junit_test_case.add_skipped_info("ignored")
  192. one_case_finish(not int(data[0]))
  193. def handle_reset_finish(data):
  194. """ reset happened and reboot finished """
  195. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  196. result = False
  197. if len(one_case["reset"]) == len(exception_reset_list):
  198. for i, exception in enumerate(exception_reset_list):
  199. if not reset_reason_matches(exception, one_case["reset"][i]):
  200. break
  201. else:
  202. result = True
  203. if not result:
  204. err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
  205. exception_reset_list)
  206. Utility.console_log(err_msg, color="orange")
  207. junit_test_case.add_failure_info(err_msg)
  208. one_case_finish(result)
  209. while not test_finish:
  210. try:
  211. timeout_value = one_case["timeout"]
  212. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  213. (EXCEPTION_PATTERN, handle_exception_reset),
  214. (ABORT_PATTERN, handle_exception_reset),
  215. (FINISH_PATTERN, handle_test_finish),
  216. (UT_APP_BOOT_UP_DONE, handle_reset_finish),
  217. timeout=timeout_value)
  218. except DUT.ExpectTimeout:
  219. Utility.console_log("Timeout in expect (%s seconds)" % timeout_value, color="orange")
  220. junit_test_case.add_failure_info("timeout")
  221. one_case_finish(False)
  222. break
  223. @ttfw_idf.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
  224. def run_unit_test_cases(env, extra_data):
  225. """
  226. extra_data can be three types of value
  227. 1. as string:
  228. 1. "case_name"
  229. 2. "case_name [reset=RESET_REASON]"
  230. 2. as dict:
  231. 1. with key like {"name": "Intr_alloc test, shared ints"}
  232. 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
  233. 3. as list of string or dict:
  234. [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
  235. :param env: test env instance
  236. :param extra_data: the case name or case list or case dictionary
  237. :return: None
  238. """
  239. case_config = format_test_case_config(extra_data)
  240. # we don't want stop on failed case (unless some special scenarios we can't handle)
  241. # this flag is used to log if any of the case failed during executing
  242. # Before exit test function this flag is used to log if the case fails
  243. failed_cases = []
  244. for ut_config in case_config:
  245. Utility.console_log("Running unit test for config: " + ut_config, "O")
  246. dut = env.get_dut("unit-test-app", app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
  247. if len(case_config[ut_config]) > 0:
  248. replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
  249. dut.start_app()
  250. Utility.console_log("Download finished, start running test cases", "O")
  251. for one_case in case_config[ut_config]:
  252. log_test_case("test case", one_case, ut_config)
  253. performance_items = []
  254. # create junit report test case
  255. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  256. try:
  257. run_one_normal_case(dut, one_case, junit_test_case)
  258. performance_items = dut.get_performance_items()
  259. except TestCaseFailed:
  260. failed_cases.append(format_case_name(one_case))
  261. except Exception as e:
  262. junit_test_case.add_failure_info("Unexpected exception: " + str(e))
  263. failed_cases.append(format_case_name(one_case))
  264. finally:
  265. TinyFW.JunitReport.update_performance(performance_items)
  266. TinyFW.JunitReport.test_case_finish(junit_test_case)
  267. # close DUT when finish running all cases for one config
  268. env.close_dut(dut.name)
  269. # raise exception if any case fails
  270. if failed_cases:
  271. Utility.console_log("Failed Cases:", color="red")
  272. for _case_name in failed_cases:
  273. Utility.console_log("\t" + _case_name, color="red")
  274. raise AssertionError("Unit Test Failed")
  275. class Handler(threading.Thread):
  276. WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!')
  277. SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!')
  278. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  279. def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
  280. self.dut = dut
  281. self.sent_signal_list = sent_signal_list
  282. self.lock = lock
  283. self.parent_case_name = parent_case_name
  284. self.child_case_name = ""
  285. self.child_case_index = child_case_index + 1
  286. self.finish = False
  287. self.result = False
  288. self.output = ""
  289. self.fail_name = None
  290. self.timeout = timeout
  291. self.force_stop = threading.Event() # it show the running status
  292. reset_dut(self.dut) # reset the board to make it start from begining
  293. threading.Thread.__init__(self, name="{} Handler".format(dut))
  294. def run(self):
  295. self.dut.start_capture_raw_data()
  296. def get_child_case_name(data):
  297. self.child_case_name = data[0]
  298. time.sleep(1)
  299. self.dut.write(str(self.child_case_index))
  300. def one_device_case_finish(result):
  301. """ one test finished, let expect loop break and log result """
  302. self.finish = True
  303. self.result = result
  304. self.output = "[{}]\n\n{}\n".format(self.child_case_name,
  305. self.dut.stop_capture_raw_data())
  306. if not result:
  307. self.fail_name = self.child_case_name
  308. def device_wait_action(data):
  309. start_time = time.time()
  310. expected_signal = data[0].encode('utf-8')
  311. while 1:
  312. if time.time() > start_time + self.timeout:
  313. Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange")
  314. break
  315. with self.lock:
  316. for sent_signal in self.sent_signal_list:
  317. if expected_signal == sent_signal["name"]:
  318. self.dut.write(sent_signal["parameter"])
  319. self.sent_signal_list.remove(sent_signal)
  320. break
  321. else:
  322. time.sleep(0.01)
  323. continue
  324. break
  325. def device_send_action(data):
  326. with self.lock:
  327. self.sent_signal_list.append({
  328. "name": data[0].encode('utf-8'),
  329. "parameter": "" if data[2] is None else data[2].encode('utf-8')
  330. # no parameter means we only write EOL to DUT
  331. })
  332. def handle_device_test_finish(data):
  333. """ test finished without reset """
  334. # in this scenario reset should not happen
  335. if int(data[1]):
  336. # case ignored
  337. Utility.console_log("Ignored: " + self.child_case_name, color="orange")
  338. one_device_case_finish(not int(data[0]))
  339. try:
  340. time.sleep(1)
  341. self.dut.write("\"{}\"".format(self.parent_case_name))
  342. self.dut.expect("Running " + self.parent_case_name + "...")
  343. except DUT.ExpectTimeout:
  344. Utility.console_log("No case detected!", color="orange")
  345. while not self.finish and not self.force_stop.isSet():
  346. try:
  347. self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), # noqa: W605 - regex
  348. get_child_case_name),
  349. (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
  350. (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
  351. (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
  352. timeout=self.timeout)
  353. except DUT.ExpectTimeout:
  354. Utility.console_log("Timeout in expect (%s seconds)" % self.timeout, color="orange")
  355. one_device_case_finish(False)
  356. break
  357. def stop(self):
  358. self.force_stop.set()
  359. def get_case_info(one_case):
  360. parent_case = one_case["name"]
  361. child_case_num = one_case["child case num"]
  362. return parent_case, child_case_num
  363. def get_dut(duts, env, name, ut_config, app_bin=None):
  364. if name in duts:
  365. dut = duts[name]
  366. else:
  367. dut = env.get_dut(name, app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
  368. duts[name] = dut
  369. replace_app_bin(dut, "unit-test-app", app_bin)
  370. dut.start_app() # download bin to board
  371. return dut
  372. def run_one_multiple_devices_case(duts, ut_config, env, one_case, app_bin, junit_test_case):
  373. lock = threading.RLock()
  374. threads = []
  375. send_signal_list = []
  376. result = True
  377. parent_case, case_num = get_case_info(one_case)
  378. for i in range(case_num):
  379. dut = get_dut(duts, env, "dut%d" % i, ut_config, app_bin)
  380. threads.append(Handler(dut, send_signal_list, lock,
  381. parent_case, i, one_case["timeout"]))
  382. for thread in threads:
  383. thread.setDaemon(True)
  384. thread.start()
  385. output = "Multiple Device Failed\n"
  386. for thread in threads:
  387. thread.join()
  388. result = result and thread.result
  389. output += thread.output
  390. if not thread.result:
  391. [thd.stop() for thd in threads]
  392. if not result:
  393. junit_test_case.add_failure_info(output)
  394. # collect performances from DUTs
  395. performance_items = []
  396. for dut_name in duts:
  397. performance_items.extend(duts[dut_name].get_performance_items())
  398. TinyFW.JunitReport.update_performance(performance_items)
  399. return result
  400. @ttfw_idf.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True)
  401. def run_multiple_devices_cases(env, extra_data):
  402. """
  403. extra_data can be two types of value
  404. 1. as dict:
  405. e.g.
  406. {"name": "gpio master/slave test example",
  407. "child case num": 2,
  408. "config": "release",
  409. "env_tag": "UT_T2_1"}
  410. 2. as list dict:
  411. e.g.
  412. [{"name": "gpio master/slave test example1",
  413. "child case num": 2,
  414. "config": "release",
  415. "env_tag": "UT_T2_1"},
  416. {"name": "gpio master/slave test example2",
  417. "child case num": 2,
  418. "config": "release",
  419. "env_tag": "UT_T2_1"}]
  420. """
  421. failed_cases = []
  422. case_config = format_test_case_config(extra_data)
  423. duts = {}
  424. for ut_config in case_config:
  425. Utility.console_log("Running unit test for config: " + ut_config, "O")
  426. for one_case in case_config[ut_config]:
  427. log_test_case("multi-device test", one_case, ut_config, )
  428. result = False
  429. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  430. try:
  431. result = run_one_multiple_devices_case(duts, ut_config, env, one_case,
  432. one_case.get('app_bin'), junit_test_case)
  433. except Exception as e:
  434. junit_test_case.add_failure_info("Unexpected exception: " + str(e))
  435. finally:
  436. if result:
  437. Utility.console_log("Success: " + format_case_name(one_case), color="green")
  438. else:
  439. failed_cases.append(format_case_name(one_case))
  440. Utility.console_log("Failed: " + format_case_name(one_case), color="red")
  441. TinyFW.JunitReport.test_case_finish(junit_test_case)
  442. # close all DUTs when finish running all cases for one config
  443. for dut in duts:
  444. env.close_dut(dut)
  445. duts = {}
  446. if failed_cases:
  447. Utility.console_log("Failed Cases:", color="red")
  448. for _case_name in failed_cases:
  449. Utility.console_log("\t" + _case_name, color="red")
  450. raise AssertionError("Unit Test Failed")
  451. def run_one_multiple_stage_case(dut, one_case, junit_test_case):
  452. reset_dut(dut)
  453. dut.start_capture_raw_data()
  454. exception_reset_list = []
  455. for test_stage in range(one_case["child case num"]):
  456. # select multi stage test case name
  457. dut.write("\"{}\"".format(one_case["name"]))
  458. dut.expect("Running " + one_case["name"] + "...")
  459. # select test function for current stage
  460. dut.write(str(test_stage + 1))
  461. # we want to set this flag in callbacks (inner functions)
  462. # use list here so we can use append to set this flag
  463. stage_finish = list()
  464. def last_stage():
  465. return test_stage == one_case["child case num"] - 1
  466. def check_reset():
  467. if one_case["reset"]:
  468. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  469. result = False
  470. if len(one_case["reset"]) == len(exception_reset_list):
  471. for i, exception in enumerate(exception_reset_list):
  472. if not reset_reason_matches(exception, one_case["reset"][i]):
  473. break
  474. else:
  475. result = True
  476. if not result:
  477. err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
  478. exception_reset_list)
  479. Utility.console_log(err_msg, color="orange")
  480. junit_test_case.add_failure_info(err_msg)
  481. else:
  482. # we allow omit reset in multi stage cases
  483. result = True
  484. return result
  485. # expect callbacks
  486. def one_case_finish(result):
  487. """ one test finished, let expect loop break and log result """
  488. # handle test finish
  489. result = result and check_reset()
  490. output = dut.stop_capture_raw_data()
  491. if result:
  492. Utility.console_log("Success: " + format_case_name(one_case), color="green")
  493. else:
  494. Utility.console_log("Failed: " + format_case_name(one_case), color="red")
  495. junit_test_case.add_failure_info(output)
  496. raise TestCaseFailed()
  497. stage_finish.append("break")
  498. def handle_exception_reset(data):
  499. """
  500. just append data to exception list.
  501. exception list will be checked in ``handle_reset_finish``, once reset finished.
  502. """
  503. exception_reset_list.append(data[0])
  504. def handle_test_finish(data):
  505. """ test finished without reset """
  506. # in this scenario reset should not happen
  507. if int(data[1]):
  508. # case ignored
  509. Utility.console_log("Ignored: " + format_case_name(one_case), color="orange")
  510. junit_test_case.add_skipped_info("ignored")
  511. # only passed in last stage will be regarded as real pass
  512. if last_stage():
  513. one_case_finish(not int(data[0]))
  514. else:
  515. Utility.console_log("test finished before enter last stage", color="orange")
  516. one_case_finish(False)
  517. def handle_next_stage(data):
  518. """ reboot finished. we goto next stage """
  519. if last_stage():
  520. # already last stage, should never goto next stage
  521. Utility.console_log("didn't finish at last stage", color="orange")
  522. one_case_finish(False)
  523. else:
  524. stage_finish.append("continue")
  525. while not stage_finish:
  526. try:
  527. timeout_value = one_case["timeout"]
  528. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  529. (EXCEPTION_PATTERN, handle_exception_reset),
  530. (ABORT_PATTERN, handle_exception_reset),
  531. (FINISH_PATTERN, handle_test_finish),
  532. (UT_APP_BOOT_UP_DONE, handle_next_stage),
  533. timeout=timeout_value)
  534. except DUT.ExpectTimeout:
  535. Utility.console_log("Timeout in expect (%s seconds)" % timeout_value, color="orange")
  536. one_case_finish(False)
  537. break
  538. if stage_finish[0] == "break":
  539. # test breaks on current stage
  540. break
  541. @ttfw_idf.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
  542. def run_multiple_stage_cases(env, extra_data):
  543. """
  544. extra_data can be 2 types of value
  545. 1. as dict: Mandantory keys: "name" and "child case num", optional keys: "reset" and others
  546. 3. as list of string or dict:
  547. [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
  548. :param env: test env instance
  549. :param extra_data: the case name or case list or case dictionary
  550. :return: None
  551. """
  552. case_config = format_test_case_config(extra_data)
  553. # we don't want stop on failed case (unless some special scenarios we can't handle)
  554. # this flag is used to log if any of the case failed during executing
  555. # Before exit test function this flag is used to log if the case fails
  556. failed_cases = []
  557. for ut_config in case_config:
  558. Utility.console_log("Running unit test for config: " + ut_config, "O")
  559. dut = env.get_dut("unit-test-app", app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
  560. if len(case_config[ut_config]) > 0:
  561. replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
  562. dut.start_app()
  563. for one_case in case_config[ut_config]:
  564. log_test_case("multi-stage test", one_case, ut_config)
  565. performance_items = []
  566. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  567. try:
  568. run_one_multiple_stage_case(dut, one_case, junit_test_case)
  569. performance_items = dut.get_performance_items()
  570. except TestCaseFailed:
  571. failed_cases.append(format_case_name(one_case))
  572. except Exception as e:
  573. junit_test_case.add_failure_info("Unexpected exception: " + str(e))
  574. failed_cases.append(format_case_name(one_case))
  575. finally:
  576. TinyFW.JunitReport.update_performance(performance_items)
  577. TinyFW.JunitReport.test_case_finish(junit_test_case)
  578. # close DUT when finish running all cases for one config
  579. env.close_dut(dut.name)
  580. # raise exception if any case fails
  581. if failed_cases:
  582. Utility.console_log("Failed Cases:", color="red")
  583. for _case_name in failed_cases:
  584. Utility.console_log("\t" + _case_name, color="red")
  585. raise AssertionError("Unit Test Failed")
  586. def detect_update_unit_test_info(env, extra_data, app_bin):
  587. case_config = format_test_case_config(extra_data)
  588. for ut_config in case_config:
  589. dut = env.get_dut("unit-test-app", app_path=UT_APP_PATH, app_config_name=ut_config)
  590. replace_app_bin(dut, "unit-test-app", app_bin)
  591. dut.start_app()
  592. reset_dut(dut)
  593. # get the list of test cases
  594. dut.write("")
  595. dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
  596. def find_update_dic(name, _t, _timeout, child_case_num=None):
  597. for _case_data in extra_data:
  598. if _case_data['name'] == name:
  599. _case_data['type'] = _t
  600. if 'timeout' not in _case_data:
  601. _case_data['timeout'] = _timeout
  602. if child_case_num:
  603. _case_data['child case num'] = child_case_num
  604. try:
  605. while True:
  606. data = dut.expect(TEST_PATTERN, timeout=DEFAULT_TIMEOUT)
  607. test_case_name = data[1]
  608. m = re.search(r'\[timeout=(\d+)\]', data[2])
  609. if m:
  610. timeout = int(m.group(1))
  611. else:
  612. timeout = 30
  613. m = re.search(r'\[multi_stage\]', data[2])
  614. if m:
  615. test_case_type = MULTI_STAGE_ID
  616. else:
  617. m = re.search(r'\[multi_device\]', data[2])
  618. if m:
  619. test_case_type = MULTI_DEVICE_ID
  620. else:
  621. test_case_type = SIMPLE_TEST_ID
  622. find_update_dic(test_case_name, test_case_type, timeout)
  623. if data[3] and re.search(END_LIST_STR, data[3]):
  624. break
  625. continue
  626. # find the last submenu item
  627. data = dut.expect(TEST_SUBMENU_PATTERN, timeout=DEFAULT_TIMEOUT)
  628. find_update_dic(test_case_name, test_case_type, timeout, child_case_num=int(data[0]))
  629. if data[1] and re.search(END_LIST_STR, data[1]):
  630. break
  631. # check if the unit test case names are correct, i.e. they could be found in the device
  632. for _dic in extra_data:
  633. if 'type' not in _dic:
  634. raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
  635. except DUT.ExpectTimeout:
  636. Utility.console_log("Timeout during getting the test list", color="red")
  637. finally:
  638. dut.close()
  639. # These options are the same for all configs, therefore there is no need to continue
  640. break
  641. if __name__ == '__main__':
  642. parser = argparse.ArgumentParser()
  643. parser.add_argument(
  644. '--repeat', '-r',
  645. help='Number of repetitions for the test(s). Default is 1.',
  646. type=int,
  647. default=1
  648. )
  649. parser.add_argument("--env_config_file", "-e",
  650. help="test env config file",
  651. default=None
  652. )
  653. parser.add_argument("--app_bin", "-b",
  654. help="application binary file for flashing the chip",
  655. default=None
  656. )
  657. parser.add_argument(
  658. 'test',
  659. help='Comma separated list of <option>:<argument> where option can be "name" (default), "child case num", \
  660. "config", "timeout".',
  661. nargs='+'
  662. )
  663. args = parser.parse_args()
  664. list_of_dicts = []
  665. for test in args.test:
  666. test_args = test.split(r',')
  667. test_dict = dict()
  668. for test_item in test_args:
  669. if len(test_item) == 0:
  670. continue
  671. pair = test_item.split(r':', 1)
  672. if len(pair) == 1 or pair[0] == 'name':
  673. test_dict['name'] = pair[0]
  674. elif len(pair) == 2:
  675. if pair[0] == 'timeout' or pair[0] == 'child case num':
  676. test_dict[pair[0]] = int(pair[1])
  677. else:
  678. test_dict[pair[0]] = pair[1]
  679. else:
  680. raise ValueError('Error in argument item {} of {}'.format(test_item, test))
  681. test_dict['app_bin'] = args.app_bin
  682. list_of_dicts.append(test_dict)
  683. TinyFW.set_default_config(env_config_file=args.env_config_file)
  684. env_config = TinyFW.get_default_config()
  685. env_config['app'] = ttfw_idf.UT
  686. env_config['dut'] = ttfw_idf.IDFDUT
  687. env_config['test_suite_name'] = 'unit_test_parsing'
  688. test_env = Env.Env(**env_config)
  689. detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
  690. for index in range(1, args.repeat + 1):
  691. if args.repeat > 1:
  692. Utility.console_log("Repetition {}".format(index), color="green")
  693. for dic in list_of_dicts:
  694. t = dic.get('type', SIMPLE_TEST_ID)
  695. if t == SIMPLE_TEST_ID:
  696. run_unit_test_cases(extra_data=dic)
  697. elif t == MULTI_STAGE_ID:
  698. run_multiple_stage_cases(extra_data=dic)
  699. elif t == MULTI_DEVICE_ID:
  700. run_multiple_devices_cases(extra_data=dic)
  701. else:
  702. raise ValueError('Unknown type {} of {}'.format(t, dic.get('name')))