unit_test.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. #!/usr/bin/env python
  2. #
  3. # Copyright 2018 Espressif Systems (Shanghai) PTE LTD
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """
  17. Test script for unit test case.
  18. """
  19. import argparse
  20. import os
  21. import re
  22. import sys
  23. import threading
  24. import time
  25. try:
  26. import TinyFW
  27. except ImportError:
  28. # if we want to run test case outside `tiny-test-fw` folder,
  29. # we need to insert tiny-test-fw path into sys path
  30. test_fw_path = os.getenv("TEST_FW_PATH")
  31. if test_fw_path and test_fw_path not in sys.path:
  32. sys.path.insert(0, test_fw_path)
  33. else:
  34. # or try the copy in IDF
  35. idf_path = os.getenv("IDF_PATH")
  36. tiny_test_path = idf_path + "/tools/tiny-test-fw"
  37. if os.path.exists(tiny_test_path):
  38. sys.path.insert(0, tiny_test_path)
  39. import TinyFW
  40. import IDF
  41. import Utility
  42. import Env
  43. from DUT import ExpectTimeout
  44. from IDF.IDFApp import UT
  45. from TinyFW import TestCaseFailed
  46. from Utility import format_case_id, handle_unexpected_exception
  47. UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
  48. STRIP_CONFIG_PATTERN = re.compile(r'(.+?)(_\d+)?$')
  49. # matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)"
  50. RESET_PATTERN = re.compile(r"(ets [\w]{3}\s+[\d]{1,2} [\d]{4} [\d]{2}:[\d]{2}:[\d]{2}[^()]*\([\w].*?\))")
  51. EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
  52. ABORT_PATTERN = re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)")
  53. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  54. END_LIST_STR = r'\r?\nEnter test for running'
  55. TEST_PATTERN = re.compile(r'\((\d+)\)\s+"([^"]+)" ([^\r\n]+)\r?\n(' + END_LIST_STR + r')?')
  56. TEST_SUBMENU_PATTERN = re.compile(r'\s+\((\d+)\)\s+"[^"]+"\r?\n(?=(?=\()|(' + END_LIST_STR + r'))')
  57. SIMPLE_TEST_ID = 0
  58. MULTI_STAGE_ID = 1
  59. MULTI_DEVICE_ID = 2
  60. DEFAULT_TIMEOUT = 20
  61. DUT_STARTUP_CHECK_RETRY_COUNT = 5
  62. TEST_HISTORY_CHECK_TIMEOUT = 1
  63. def reset_reason_matches(reported_str, expected_str):
  64. known_aliases = {
  65. "_RESET": "_RST",
  66. "POWERON_RESET": "POWERON",
  67. "DEEPSLEEP_RESET": "DSLEEP",
  68. }
  69. if expected_str in reported_str:
  70. return True
  71. for token, alias in known_aliases.items():
  72. if token in expected_str:
  73. alt_expected_str = expected_str.replace(token, alias)
  74. if alt_expected_str in reported_str:
  75. return True
  76. return False
  77. def format_test_case_config(test_case_data, target='esp32'):
  78. """
  79. convert the test case data to unified format.
  80. We need to following info to run unit test cases:
  81. 1. unit test app config
  82. 2. test case name
  83. 3. test case reset info
  84. the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
  85. Each test case is a dict with "name" and "reset" as keys. For example::
  86. case_config = {
  87. "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
  88. "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
  89. }
  90. If config is not specified for test case, then
  91. :param test_case_data: string, list, or a dictionary list
  92. :param target: target
  93. :return: formatted data
  94. """
  95. case_config = dict()
  96. def parse_case(one_case_data):
  97. """ parse and format one case """
  98. def process_reset_list(reset_list):
  99. # strip space and remove white space only items
  100. _output = list()
  101. for _r in reset_list:
  102. _data = _r.strip(" ")
  103. if _data:
  104. _output.append(_data)
  105. return _output
  106. _case = dict()
  107. if isinstance(one_case_data, str):
  108. _temp = one_case_data.split(" [reset=")
  109. _case["name"] = _temp[0]
  110. try:
  111. _case["reset"] = process_reset_list(_temp[1][0:-1].split(","))
  112. except IndexError:
  113. _case["reset"] = list()
  114. elif isinstance(one_case_data, dict):
  115. _case = one_case_data.copy()
  116. assert "name" in _case
  117. if "reset" not in _case:
  118. _case["reset"] = list()
  119. else:
  120. if isinstance(_case["reset"], str):
  121. _case["reset"] = process_reset_list(_case["reset"].split(","))
  122. else:
  123. raise TypeError("Not supported type during parsing unit test case")
  124. if "config" not in _case:
  125. _case["config"] = "default"
  126. if 'target' not in _case:
  127. _case['target'] = target
  128. return _case
  129. if not isinstance(test_case_data, list):
  130. test_case_data = [test_case_data]
  131. for case_data in test_case_data:
  132. parsed_case = parse_case(case_data)
  133. try:
  134. case_config[parsed_case["config"]].append(parsed_case)
  135. except KeyError:
  136. case_config[parsed_case["config"]] = [parsed_case]
  137. return case_config
  138. def replace_app_bin(dut, name, new_app_bin):
  139. if new_app_bin is None:
  140. return
  141. search_pattern = '/{}.bin'.format(name)
  142. for i, config in enumerate(dut.download_config):
  143. if config.endswith(search_pattern):
  144. dut.download_config[i] = new_app_bin
  145. Utility.console_log("The replaced application binary is {}".format(new_app_bin), "O")
  146. break
  147. def format_case_name(case):
  148. # we could split cases of same config into multiple binaries as we have limited rom space
  149. # we should regard those configs like `default` and `default_2` as the same config
  150. match = STRIP_CONFIG_PATTERN.match(case['config'])
  151. stripped_config_name = match.group(1)
  152. return format_case_id(case['name'], target=case['target'], config=stripped_config_name)
  153. def reset_dut(dut):
  154. dut.reset()
  155. # esptool ``run`` cmd takes quite long time.
  156. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
  157. # this could cause checking bootup print failed.
  158. # now use input cmd `-` and check test history to check if DUT is bootup.
  159. # we'll retry this step for a few times,
  160. # in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
  161. for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
  162. dut.write("-")
  163. try:
  164. dut.expect("0 Tests 0 Failures 0 Ignored", timeout=TEST_HISTORY_CHECK_TIMEOUT)
  165. break
  166. except ExpectTimeout:
  167. pass
  168. else:
  169. raise AssertionError("Reset {} ({}) failed!".format(dut.name, dut.port))
  170. def log_test_case(description, test_case, ut_config):
  171. Utility.console_log("Running {} '{}' (config {})".format(description, test_case['name'], ut_config),
  172. color='orange')
  173. Utility.console_log('Tags: %s' % ', '.join('%s=%s' % (k, v) for (k, v) in test_case.items()
  174. if k != 'name' and v is not None),
  175. color='orange')
  176. def run_one_normal_case(dut, one_case, junit_test_case):
  177. reset_dut(dut)
  178. dut.start_capture_raw_data()
  179. # run test case
  180. dut.write("\"{}\"".format(one_case["name"]))
  181. dut.expect("Running " + one_case["name"] + "...")
  182. exception_reset_list = []
  183. # we want to set this flag in callbacks (inner functions)
  184. # use list here so we can use append to set this flag
  185. test_finish = list()
  186. # expect callbacks
  187. def one_case_finish(result):
  188. """ one test finished, let expect loop break and log result """
  189. test_finish.append(True)
  190. output = dut.stop_capture_raw_data()
  191. if result:
  192. Utility.console_log("Success: " + one_case["name"], color="green")
  193. else:
  194. Utility.console_log("Failed: " + one_case["name"], color="red")
  195. junit_test_case.add_failure_info(output)
  196. raise TestCaseFailed()
  197. def handle_exception_reset(data):
  198. """
  199. just append data to exception list.
  200. exception list will be checked in ``handle_reset_finish``, once reset finished.
  201. """
  202. exception_reset_list.append(data[0])
  203. def handle_test_finish(data):
  204. """ test finished without reset """
  205. # in this scenario reset should not happen
  206. assert not exception_reset_list
  207. if int(data[1]):
  208. # case ignored
  209. Utility.console_log("Ignored: " + one_case["name"], color="orange")
  210. junit_test_case.add_skipped_info("ignored")
  211. one_case_finish(not int(data[0]))
  212. def handle_reset_finish(data):
  213. """ reset happened and reboot finished """
  214. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  215. result = False
  216. if len(one_case["reset"]) == len(exception_reset_list):
  217. for i, exception in enumerate(exception_reset_list):
  218. if one_case["reset"][i] not in exception:
  219. break
  220. else:
  221. result = True
  222. if not result:
  223. err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
  224. exception_reset_list)
  225. Utility.console_log(err_msg, color="orange")
  226. junit_test_case.add_error_info(err_msg)
  227. one_case_finish(result)
  228. while not test_finish:
  229. try:
  230. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  231. (EXCEPTION_PATTERN, handle_exception_reset),
  232. (ABORT_PATTERN, handle_exception_reset),
  233. (FINISH_PATTERN, handle_test_finish),
  234. (UT_APP_BOOT_UP_DONE, handle_reset_finish),
  235. timeout=one_case["timeout"])
  236. except ExpectTimeout:
  237. Utility.console_log("Timeout in expect", color="orange")
  238. junit_test_case.add_failure_info("timeout")
  239. one_case_finish(False)
  240. break
  241. @IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
  242. def run_unit_test_cases(env, extra_data):
  243. """
  244. extra_data can be three types of value
  245. 1. as string:
  246. 1. "case_name"
  247. 2. "case_name [reset=RESET_REASON]"
  248. 2. as dict:
  249. 1. with key like {"name": "Intr_alloc test, shared ints"}
  250. 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
  251. 3. as list of string or dict:
  252. [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
  253. :param env: test env instance
  254. :param extra_data: the case name or case list or case dictionary
  255. :return: None
  256. """
  257. case_config = format_test_case_config(extra_data)
  258. # we don't want stop on failed case (unless some special scenarios we can't handle)
  259. # this flag is used to log if any of the case failed during executing
  260. # Before exit test function this flag is used to log if the case fails
  261. failed_cases = []
  262. for ut_config in case_config:
  263. Utility.console_log("Running unit test for config: " + ut_config, "O")
  264. dut = env.get_dut("unit-test-app", app_path=ut_config, allow_dut_exception=True)
  265. if len(case_config[ut_config]) > 0:
  266. replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
  267. dut.start_app()
  268. Utility.console_log("Download finished, start running test cases", "O")
  269. for one_case in case_config[ut_config]:
  270. # create junit report test case
  271. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  272. try:
  273. run_one_normal_case(dut, one_case, junit_test_case)
  274. except TestCaseFailed:
  275. failed_cases.append(format_case_name(one_case))
  276. except Exception as e:
  277. handle_unexpected_exception(junit_test_case, e)
  278. failed_cases.append(format_case_name(one_case))
  279. finally:
  280. TinyFW.JunitReport.test_case_finish(junit_test_case)
  281. class Handler(threading.Thread):
  282. WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!')
  283. SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!')
  284. FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
  285. def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
  286. self.dut = dut
  287. self.sent_signal_list = sent_signal_list
  288. self.lock = lock
  289. self.parent_case_name = parent_case_name
  290. self.child_case_name = ""
  291. self.child_case_index = child_case_index + 1
  292. self.finish = False
  293. self.result = False
  294. self.output = ""
  295. self.fail_name = None
  296. self.timeout = timeout
  297. self.force_stop = threading.Event() # it show the running status
  298. reset_dut(self.dut) # reset the board to make it start from begining
  299. threading.Thread.__init__(self, name="{} Handler".format(dut))
  300. def run(self):
  301. self.dut.start_capture_raw_data()
  302. def get_child_case_name(data):
  303. self.child_case_name = data[0]
  304. time.sleep(1)
  305. self.dut.write(str(self.child_case_index))
  306. def one_device_case_finish(result):
  307. """ one test finished, let expect loop break and log result """
  308. self.finish = True
  309. self.result = result
  310. self.output = "[{}]\n\n{}\n".format(self.child_case_name,
  311. self.dut.stop_capture_raw_data())
  312. if not result:
  313. self.fail_name = self.child_case_name
  314. def device_wait_action(data):
  315. start_time = time.time()
  316. expected_signal = data[0]
  317. while 1:
  318. if time.time() > start_time + self.timeout:
  319. Utility.console_log("Timeout in device for function: %s" % self.child_case_name, color="orange")
  320. break
  321. with self.lock:
  322. for sent_signal in self.sent_signal_list:
  323. if expected_signal == sent_signal["name"]:
  324. self.dut.write(sent_signal["parameter"])
  325. self.sent_signal_list.remove(sent_signal)
  326. break
  327. else:
  328. time.sleep(0.01)
  329. continue
  330. break
  331. def device_send_action(data):
  332. with self.lock:
  333. self.sent_signal_list.append({
  334. "name": data[0].encode('utf-8'),
  335. "parameter": "" if data[2] is None else data[2].encode('utf-8')
  336. # no parameter means we only write EOL to DUT
  337. })
  338. def handle_device_test_finish(data):
  339. """ test finished without reset """
  340. # in this scenario reset should not happen
  341. if int(data[1]):
  342. # case ignored
  343. Utility.console_log("Ignored: " + self.child_case_name, color="orange")
  344. one_device_case_finish(not int(data[0]))
  345. try:
  346. time.sleep(1)
  347. self.dut.write("\"{}\"".format(self.parent_case_name))
  348. self.dut.expect("Running " + self.parent_case_name + "...")
  349. except ExpectTimeout:
  350. Utility.console_log("No case detected!", color="orange")
  351. while not self.finish and not self.force_stop.isSet():
  352. try:
  353. self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), # noqa: W605 - regex
  354. get_child_case_name),
  355. (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
  356. (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
  357. (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
  358. timeout=self.timeout)
  359. except ExpectTimeout:
  360. Utility.console_log("Timeout in expect", color="orange")
  361. one_device_case_finish(False)
  362. break
  363. def stop(self):
  364. self.force_stop.set()
  365. def get_case_info(one_case):
  366. parent_case = one_case["name"]
  367. child_case_num = one_case["child case num"]
  368. return parent_case, child_case_num
  369. def get_dut(duts, env, name, ut_config, app_bin=None):
  370. if name in duts:
  371. dut = duts[name]
  372. else:
  373. dut = env.get_dut(name, app_path=ut_config, allow_dut_exception=True)
  374. duts[name] = dut
  375. replace_app_bin(dut, "unit-test-app", app_bin)
  376. dut.start_app() # download bin to board
  377. return dut
  378. def run_one_multiple_devices_case(duts, ut_config, env, one_case, app_bin, junit_test_case):
  379. lock = threading.RLock()
  380. threads = []
  381. send_signal_list = []
  382. result = True
  383. parent_case, case_num = get_case_info(one_case)
  384. for i in range(case_num):
  385. dut = get_dut(duts, env, "dut%d" % i, ut_config, app_bin)
  386. threads.append(Handler(dut, send_signal_list, lock,
  387. parent_case, i, one_case["timeout"]))
  388. for thread in threads:
  389. thread.setDaemon(True)
  390. thread.start()
  391. output = "Multiple Device Failed\n"
  392. for thread in threads:
  393. thread.join()
  394. result = result and thread.result
  395. output += thread.output
  396. if not thread.result:
  397. [thd.stop() for thd in threads]
  398. if not result:
  399. junit_test_case.add_failure_info(output)
  400. return result
  401. @IDF.idf_unit_test(env_tag="UT_T2_1", junit_report_by_case=True)
  402. def run_multiple_devices_cases(env, extra_data):
  403. """
  404. extra_data can be two types of value
  405. 1. as dict:
  406. e.g.
  407. {"name": "gpio master/slave test example",
  408. "child case num": 2,
  409. "config": "release",
  410. "env_tag": "UT_T2_1"}
  411. 2. as list dict:
  412. e.g.
  413. [{"name": "gpio master/slave test example1",
  414. "child case num": 2,
  415. "config": "release",
  416. "env_tag": "UT_T2_1"},
  417. {"name": "gpio master/slave test example2",
  418. "child case num": 2,
  419. "config": "release",
  420. "env_tag": "UT_T2_1"}]
  421. """
  422. failed_cases = []
  423. case_config = format_test_case_config(extra_data)
  424. duts = {}
  425. for ut_config in case_config:
  426. Utility.console_log("Running unit test for config: " + ut_config, "O")
  427. for one_case in case_config[ut_config]:
  428. result = False
  429. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  430. try:
  431. result = run_one_multiple_devices_case(duts, ut_config, env, one_case,
  432. one_case.get('app_bin'), junit_test_case)
  433. except TestCaseFailed:
  434. pass # result is False, this is handled by the finally block
  435. except Exception as e:
  436. handle_unexpected_exception(junit_test_case, e)
  437. finally:
  438. if result:
  439. Utility.console_log("Success: " + format_case_name(one_case), color="green")
  440. else:
  441. failed_cases.append(format_case_name(one_case))
  442. Utility.console_log("Failed: " + format_case_name(one_case), color="red")
  443. TinyFW.JunitReport.test_case_finish(junit_test_case)
  444. def run_one_multiple_stage_case(dut, one_case, junit_test_case):
  445. reset_dut(dut)
  446. dut.start_capture_raw_data()
  447. exception_reset_list = []
  448. for test_stage in range(one_case["child case num"]):
  449. # select multi stage test case name
  450. dut.write("\"{}\"".format(one_case["name"]))
  451. dut.expect("Running " + one_case["name"] + "...")
  452. # select test function for current stage
  453. dut.write(str(test_stage + 1))
  454. # we want to set this flag in callbacks (inner functions)
  455. # use list here so we can use append to set this flag
  456. stage_finish = list()
  457. def last_stage():
  458. return test_stage == one_case["child case num"] - 1
  459. def check_reset():
  460. if one_case["reset"]:
  461. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  462. result = False
  463. if len(one_case["reset"]) == len(exception_reset_list):
  464. for i, exception in enumerate(exception_reset_list):
  465. if not reset_reason_matches(exception, one_case["reset"][i]):
  466. break
  467. else:
  468. result = True
  469. if not result:
  470. err_msg = "Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}".format(one_case["reset"],
  471. exception_reset_list)
  472. Utility.console_log(err_msg, color="orange")
  473. junit_test_case.add_failure_info(err_msg)
  474. else:
  475. # we allow omit reset in multi stage cases
  476. result = True
  477. return result
  478. # expect callbacks
  479. def one_case_finish(result):
  480. """ one test finished, let expect loop break and log result """
  481. # handle test finish
  482. result = result and check_reset()
  483. output = dut.stop_capture_raw_data()
  484. if result:
  485. Utility.console_log("Success: " + format_case_name(one_case), color="green")
  486. else:
  487. Utility.console_log("Failed: " + format_case_name(one_case), color="red")
  488. junit_test_case.add_failure_info(output)
  489. raise TestCaseFailed()
  490. stage_finish.append("break")
  491. def handle_exception_reset(data):
  492. """
  493. just append data to exception list.
  494. exception list will be checked in ``handle_reset_finish``, once reset finished.
  495. """
  496. exception_reset_list.append(data[0])
  497. def handle_test_finish(data):
  498. """ test finished without reset """
  499. # in this scenario reset should not happen
  500. if int(data[1]):
  501. # case ignored
  502. Utility.console_log("Ignored: " + format_case_name(one_case), color="orange")
  503. junit_test_case.add_skipped_info("ignored")
  504. # only passed in last stage will be regarded as real pass
  505. if last_stage():
  506. one_case_finish(not int(data[0]))
  507. else:
  508. Utility.console_log("test finished before enter last stage", color="orange")
  509. one_case_finish(False)
  510. def handle_next_stage(data):
  511. """ reboot finished. we goto next stage """
  512. if last_stage():
  513. # already last stage, should never goto next stage
  514. Utility.console_log("didn't finish at last stage", color="orange")
  515. one_case_finish(False)
  516. else:
  517. stage_finish.append("continue")
  518. while not stage_finish:
  519. try:
  520. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  521. (EXCEPTION_PATTERN, handle_exception_reset),
  522. (ABORT_PATTERN, handle_exception_reset),
  523. (FINISH_PATTERN, handle_test_finish),
  524. (UT_APP_BOOT_UP_DONE, handle_next_stage),
  525. timeout=one_case["timeout"])
  526. except ExpectTimeout:
  527. Utility.console_log("Timeout in expect", color="orange")
  528. one_case_finish(False)
  529. break
  530. if stage_finish[0] == "break":
  531. # test breaks on current stage
  532. break
  533. @IDF.idf_unit_test(env_tag="UT_T1_1", junit_report_by_case=True)
  534. def run_multiple_stage_cases(env, extra_data):
  535. """
  536. extra_data can be 2 types of value
  537. 1. as dict: Mandatory keys: "name" and "child case num", optional keys: "reset" and others
  538. 3. as list of string or dict:
  539. [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
  540. :param env: test env instance
  541. :param extra_data: the case name or case list or case dictionary
  542. :return: None
  543. """
  544. case_config = format_test_case_config(extra_data)
  545. # we don't want stop on failed case (unless some special scenarios we can't handle)
  546. # this flag is used to log if any of the case failed during executing
  547. # Before exit test function this flag is used to log if the case fails
  548. failed_cases = []
  549. for ut_config in case_config:
  550. Utility.console_log("Running unit test for config: " + ut_config, "O")
  551. dut = env.get_dut("unit-test-app", app_path=ut_config, allow_dut_exception=True)
  552. if len(case_config[ut_config]) > 0:
  553. replace_app_bin(dut, "unit-test-app", case_config[ut_config][0].get('app_bin'))
  554. dut.start_app()
  555. for one_case in case_config[ut_config]:
  556. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  557. try:
  558. run_one_multiple_stage_case(dut, one_case, junit_test_case)
  559. except TestCaseFailed:
  560. failed_cases.append(format_case_name(one_case))
  561. except Exception as e:
  562. handle_unexpected_exception(junit_test_case, e)
  563. failed_cases.append(format_case_name(one_case))
  564. finally:
  565. TinyFW.JunitReport.test_case_finish(junit_test_case)
  566. def detect_update_unit_test_info(env, extra_data, app_bin):
  567. case_config = format_test_case_config(extra_data)
  568. for ut_config in case_config:
  569. dut = env.get_dut("unit-test-app", app_path=ut_config)
  570. replace_app_bin(dut, "unit-test-app", app_bin)
  571. dut.start_app()
  572. reset_dut(dut)
  573. # get the list of test cases
  574. dut.write("")
  575. dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
  576. def find_update_dic(name, _t, _timeout, child_case_num=None):
  577. for _case_data in extra_data:
  578. if _case_data['name'] == name:
  579. _case_data['type'] = _t
  580. if 'timeout' not in _case_data:
  581. _case_data['timeout'] = _timeout
  582. if child_case_num:
  583. _case_data['child case num'] = child_case_num
  584. try:
  585. while True:
  586. data = dut.expect(TEST_PATTERN, timeout=DEFAULT_TIMEOUT)
  587. test_case_name = data[1]
  588. m = re.search(r'\[timeout=(\d+)\]', data[2])
  589. if m:
  590. timeout = int(m.group(1))
  591. else:
  592. timeout = 30
  593. m = re.search(r'\[multi_stage\]', data[2])
  594. if m:
  595. test_case_type = MULTI_STAGE_ID
  596. else:
  597. m = re.search(r'\[multi_device\]', data[2])
  598. if m:
  599. test_case_type = MULTI_DEVICE_ID
  600. else:
  601. test_case_type = SIMPLE_TEST_ID
  602. find_update_dic(test_case_name, test_case_type, timeout)
  603. if data[3] and re.search(END_LIST_STR, data[3]):
  604. break
  605. continue
  606. # find the last submenu item
  607. data = dut.expect(TEST_SUBMENU_PATTERN, timeout=DEFAULT_TIMEOUT)
  608. find_update_dic(test_case_name, test_case_type, timeout, child_case_num=int(data[0]))
  609. if data[1] and re.search(END_LIST_STR, data[1]):
  610. break
  611. # check if the unit test case names are correct, i.e. they could be found in the device
  612. for _dic in extra_data:
  613. if 'type' not in _dic:
  614. raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
  615. except ExpectTimeout:
  616. Utility.console_log("Timeout during getting the test list", color="red")
  617. finally:
  618. dut.close()
  619. # These options are the same for all configs, therefore there is no need to continue
  620. break
  621. if __name__ == '__main__':
  622. parser = argparse.ArgumentParser()
  623. parser.add_argument(
  624. '--repeat', '-r',
  625. help='Number of repetitions for the test(s). Default is 1.',
  626. type=int,
  627. default=1
  628. )
  629. parser.add_argument('--env_config_file', '-e',
  630. help='test env config file',
  631. default=None)
  632. parser.add_argument('--app_bin', '-b',
  633. help='application binary file for flashing the chip',
  634. default=None)
  635. parser.add_argument('test',
  636. help='Comma separated list of <option>:<argument> where option can be "name" (default), '
  637. '"child case num", "config", "timeout".',
  638. nargs='+')
  639. args = parser.parse_args()
  640. list_of_dicts = []
  641. for test in args.test:
  642. test_args = test.split(r',')
  643. test_dict = dict()
  644. for test_item in test_args:
  645. if len(test_item) == 0:
  646. continue
  647. pair = test_item.split(r':')
  648. if len(pair) == 1 or pair[0] == 'name':
  649. test_dict['name'] = pair[0]
  650. elif len(pair) == 2:
  651. if pair[0] == 'timeout' or pair[0] == 'child case num':
  652. test_dict[pair[0]] = int(pair[1])
  653. else:
  654. test_dict[pair[0]] = pair[1]
  655. else:
  656. raise ValueError('Error in argument item {} of {}'.format(test_item, test))
  657. test_dict['app_bin'] = args.app_bin
  658. list_of_dicts.append(test_dict)
  659. TinyFW.set_default_config(env_config_file=args.env_config_file)
  660. env_config = TinyFW.get_default_config()
  661. env_config['app'] = UT
  662. env_config['dut'] = IDF.IDFDUT
  663. env_config['test_suite_name'] = 'unit_test_parsing'
  664. test_env = Env.Env(**env_config)
  665. detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
  666. for index in range(1, args.repeat + 1):
  667. if args.repeat > 1:
  668. Utility.console_log("Repetition {}".format(index), color="green")
  669. for dic in list_of_dicts:
  670. t = dic.get('type', SIMPLE_TEST_ID)
  671. if t == SIMPLE_TEST_ID:
  672. run_unit_test_cases(extra_data=dic)
  673. elif t == MULTI_STAGE_ID:
  674. run_multiple_stage_cases(extra_data=dic)
  675. elif t == MULTI_DEVICE_ID:
  676. run_multiple_devices_cases(extra_data=dic)
  677. else:
  678. raise ValueError('Unknown type {} of {}'.format(t, dic.get('name')))