unit_test.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. #!/usr/bin/env python
  2. #
  3. # SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
  4. # SPDX-License-Identifier: Apache-2.0
  5. """
  6. Test script for unit test case.
  7. """
  8. import argparse
  9. import re
  10. import threading
  11. import time
  12. import ttfw_idf
  13. from tiny_test_fw import DUT, Env, TinyFW, Utility
  14. from tiny_test_fw.TinyFW import TestCaseFailed
  15. from tiny_test_fw.Utility import format_case_id, handle_unexpected_exception
  16. UT_APP_BOOT_UP_DONE = 'Press ENTER to see the list of tests.'
  17. STRIP_CONFIG_PATTERN = re.compile(r'(.+?)(_\d+)?$')
  18. # matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)"
  19. RESET_PATTERN = re.compile(r'(rst:0x[0-9a-fA-F]*\s\([\w].*?\),boot:0x[0-9a-fA-F]*\s\([\w].*?\))')
  20. EXCEPTION_PATTERN = re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))")
  21. ABORT_PATTERN = re.compile(r'(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)')
  22. ASSERT_PATTERN = re.compile(r'(assert failed: .*)')
  23. FINISH_PATTERN = re.compile(r'1 Tests (\d) Failures (\d) Ignored')
  24. END_LIST_STR = r'\r?\nEnter test for running'
  25. TEST_PATTERN = re.compile(r'\((\d+)\)\s+"([^"]+)" ([^\r\n]+)\r?\n(' + END_LIST_STR + r')?')
  26. TEST_SUBMENU_PATTERN = re.compile(r'\s+\((\d+)\)\s+"[^"]+"\r?\n(?=(?=\()|(' + END_LIST_STR + r'))')
  27. UT_APP_PATH = 'tools/unit-test-app'
  28. SIMPLE_TEST_ID = 0
  29. MULTI_STAGE_ID = 1
  30. MULTI_DEVICE_ID = 2
  31. DEFAULT_TIMEOUT = 20
  32. DUT_DELAY_AFTER_RESET = 2
  33. DUT_STARTUP_CHECK_RETRY_COUNT = 5
  34. TEST_HISTORY_CHECK_TIMEOUT = 2
  35. def reset_reason_matches(reported_str, expected_str):
  36. known_aliases = {
  37. '_RESET': '_RST',
  38. 'POWERON_RESET': 'POWERON',
  39. 'DEEPSLEEP_RESET': 'DSLEEP',
  40. 'SW_CPU_RESET': 'SW_CPU',
  41. }
  42. if expected_str in reported_str:
  43. return True
  44. for token, alias in known_aliases.items():
  45. if token in expected_str:
  46. alt_expected_str = expected_str.replace(token, alias)
  47. if alt_expected_str in reported_str:
  48. return True
  49. return False
  50. def format_test_case_config(test_case_data, target='esp32'):
  51. """
  52. convert the test case data to unified format.
  53. We need to following info to run unit test cases:
  54. 1. unit test app config
  55. 2. test case name
  56. 3. test case reset info
  57. the formatted case config is a dict, with ut app config as keys. The value is a list of test cases.
  58. Each test case is a dict with "name" and "reset" as keys. For example::
  59. case_config = {
  60. "default": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, {...}],
  61. "psram": [{"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}],
  62. }
  63. If config is not specified for test case, then
  64. :param test_case_data: string, list, or a dictionary list
  65. :param target: target
  66. :return: formatted data
  67. """
  68. case_config = dict()
  69. def parse_case(one_case_data):
  70. """ parse and format one case """
  71. def process_reset_list(reset_list):
  72. # strip space and remove white space only items
  73. _output = list()
  74. for _r in reset_list:
  75. _data = _r.strip(' ')
  76. if _data:
  77. _output.append(_data)
  78. return _output
  79. _case = dict()
  80. if isinstance(one_case_data, str):
  81. _temp = one_case_data.split(' [reset=')
  82. _case['name'] = _temp[0]
  83. try:
  84. _case['reset'] = process_reset_list(_temp[1][0:-1].split(','))
  85. except IndexError:
  86. _case['reset'] = list()
  87. elif isinstance(one_case_data, dict):
  88. _case = one_case_data.copy()
  89. assert 'name' in _case
  90. if 'reset' not in _case:
  91. _case['reset'] = list()
  92. else:
  93. if isinstance(_case['reset'], str):
  94. _case['reset'] = process_reset_list(_case['reset'].split(','))
  95. else:
  96. raise TypeError('Not supported type during parsing unit test case')
  97. if 'config' not in _case:
  98. _case['config'] = 'default'
  99. if 'target' not in _case:
  100. _case['target'] = target
  101. return _case
  102. if not isinstance(test_case_data, list):
  103. test_case_data = [test_case_data]
  104. for case_data in test_case_data:
  105. parsed_case = parse_case(case_data)
  106. try:
  107. case_config[parsed_case['config']].append(parsed_case)
  108. except KeyError:
  109. case_config[parsed_case['config']] = [parsed_case]
  110. return case_config
  111. def replace_app_bin(dut, name, new_app_bin):
  112. if new_app_bin is None:
  113. return
  114. search_pattern = '/{}.bin'.format(name)
  115. for i, config in enumerate(dut.download_config):
  116. if config.endswith(search_pattern):
  117. dut.download_config[i] = new_app_bin
  118. Utility.console_log('The replaced application binary is {}'.format(new_app_bin), 'O')
  119. break
  120. def format_case_name(case):
  121. # we could split cases of same config into multiple binaries as we have limited rom space
  122. # we should regard those configs like `default` and `default_2` as the same config
  123. match = STRIP_CONFIG_PATTERN.match(case['config'])
  124. stripped_config_name = match.group(1)
  125. return format_case_id(case['name'], target=case['target'], config=stripped_config_name)
  126. def reset_dut(dut):
  127. dut.reset()
  128. # esptool ``run`` cmd takes quite long time.
  129. # before reset finish, serial port is closed. therefore DUT could already bootup before serial port opened.
  130. # this could cause checking bootup print failed.
  131. # now use input cmd `-` and check test history to check if DUT is bootup.
  132. # we'll retry this step for a few times,
  133. # in case `dut.reset` returns during DUT bootup (when DUT can't process any command).
  134. #
  135. # during bootup, DUT might only receive part of the first `-` command.
  136. # If it only receive `\n`, then it will print all cases. It could take more than 5 seconds, reset check will fail.
  137. # To solve this problem, we will add a delay between reset and input `-` command. And we'll also enlarge expect timeout.
  138. time.sleep(DUT_DELAY_AFTER_RESET)
  139. for _ in range(DUT_STARTUP_CHECK_RETRY_COUNT):
  140. dut.write('-')
  141. try:
  142. dut.expect('0 Tests 0 Failures 0 Ignored', timeout=TEST_HISTORY_CHECK_TIMEOUT)
  143. break
  144. except DUT.ExpectTimeout:
  145. pass
  146. else:
  147. raise AssertionError('Reset {} ({}) failed!'.format(dut.name, dut.port))
  148. def log_test_case(description, test_case, ut_config):
  149. Utility.console_log("Running {} '{}' (config {})".format(description, test_case['name'], ut_config),
  150. color='orange')
  151. Utility.console_log('Tags: %s' % ', '.join('%s=%s' % (k, v) for (k, v) in test_case.items()
  152. if k != 'name' and v is not None),
  153. color='orange')
  154. def run_one_normal_case(dut, one_case, junit_test_case):
  155. reset_dut(dut)
  156. dut.start_capture_raw_data()
  157. # run test case
  158. dut.write("\"{}\"".format(one_case['name']))
  159. dut.expect('Running ' + one_case['name'] + '...')
  160. exception_reset_list = []
  161. # we want to set this flag in callbacks (inner functions)
  162. # use list here so we can use append to set this flag
  163. test_finish = list()
  164. # expect callbacks
  165. def one_case_finish(result):
  166. """ one test finished, let expect loop break and log result """
  167. test_finish.append(True)
  168. output = dut.stop_capture_raw_data()
  169. if result:
  170. Utility.console_log('Success: ' + format_case_name(one_case), color='green')
  171. else:
  172. Utility.console_log('Failed: ' + format_case_name(one_case), color='red')
  173. junit_test_case.add_failure_info(output)
  174. raise TestCaseFailed(format_case_name(one_case))
  175. def handle_exception_reset(data):
  176. """
  177. just append data to exception list.
  178. exception list will be checked in ``handle_reset_finish``, once reset finished.
  179. """
  180. exception_reset_list.append(data[0])
  181. def handle_test_finish(data):
  182. """ test finished without reset """
  183. # in this scenario reset should not happen
  184. assert not exception_reset_list
  185. if int(data[1]):
  186. # case ignored
  187. Utility.console_log('Ignored: ' + format_case_name(one_case), color='orange')
  188. junit_test_case.add_skipped_info('ignored')
  189. one_case_finish(not int(data[0]))
  190. def handle_reset_finish(data):
  191. """ reset happened and reboot finished """
  192. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  193. result = False
  194. if len(one_case['reset']) == len(exception_reset_list):
  195. for i, exception in enumerate(exception_reset_list):
  196. if not reset_reason_matches(exception, one_case['reset'][i]):
  197. break
  198. else:
  199. result = True
  200. if not result:
  201. err_msg = 'Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}'.format(one_case['reset'],
  202. exception_reset_list)
  203. Utility.console_log(err_msg, color='orange')
  204. junit_test_case.add_failure_info(err_msg)
  205. one_case_finish(result)
  206. while not test_finish:
  207. try:
  208. timeout_value = one_case['timeout']
  209. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  210. (EXCEPTION_PATTERN, handle_exception_reset),
  211. (ABORT_PATTERN, handle_exception_reset),
  212. (ASSERT_PATTERN, handle_exception_reset),
  213. (FINISH_PATTERN, handle_test_finish),
  214. (UT_APP_BOOT_UP_DONE, handle_reset_finish),
  215. timeout=timeout_value)
  216. except DUT.ExpectTimeout:
  217. Utility.console_log('Timeout in expect (%s seconds)' % timeout_value, color='orange')
  218. junit_test_case.add_failure_info('timeout')
  219. one_case_finish(False)
  220. break
  221. @ttfw_idf.idf_unit_test(env_tag='UT_T1_1', junit_report_by_case=True)
  222. def run_unit_test_cases(env, extra_data):
  223. """
  224. extra_data can be three types of value
  225. 1. as string:
  226. 1. "case_name"
  227. 2. "case_name [reset=RESET_REASON]"
  228. 2. as dict:
  229. 1. with key like {"name": "Intr_alloc test, shared ints"}
  230. 2. with key like {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET", "config": "psram"}
  231. 3. as list of string or dict:
  232. [case1, case2, case3, {"name": "restart from PRO CPU", "reset": "SW_CPU_RESET"}, ...]
  233. :param env: test env instance
  234. :param extra_data: the case name or case list or case dictionary
  235. :return: None
  236. """
  237. case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
  238. # we don't want stop on failed case (unless some special scenarios we can't handle)
  239. # this flag is used to log if any of the case failed during executing
  240. # Before exit test function this flag is used to log if the case fails
  241. failed_cases = []
  242. for ut_config in case_config:
  243. Utility.console_log('Running unit test for config: ' + ut_config, 'O')
  244. # Get the console baudrate from the sdkconfig
  245. _app = ttfw_idf.UT(app_path=UT_APP_PATH, config_name=ut_config, target=env.default_dut_cls.TARGET)
  246. baud = _app.get_sdkconfig_config_value('CONFIG_ESP_CONSOLE_UART_BAUDRATE')
  247. if baud is None:
  248. baud = 115200
  249. Utility.console_log('Can\'t find console baudrate in sdkconfig, use 115200 as default')
  250. else:
  251. baud = int(baud, 10) if isinstance(baud, str) else baud
  252. Utility.console_log('Console baudrate is {}'.format(baud))
  253. # Get the DUT with specified baudrate
  254. dut = env.get_dut('unit-test-app', app_path=UT_APP_PATH, app_config_name=ut_config,
  255. allow_dut_exception=True, baudrate=baud)
  256. if len(case_config[ut_config]) > 0:
  257. replace_app_bin(dut, 'unit-test-app', case_config[ut_config][0].get('app_bin'))
  258. dut.start_app()
  259. Utility.console_log('Download finished, start running test cases', 'O')
  260. for one_case in case_config[ut_config]:
  261. log_test_case('test case', one_case, ut_config)
  262. performance_items = []
  263. # create junit report test case
  264. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  265. try:
  266. run_one_normal_case(dut, one_case, junit_test_case)
  267. performance_items = dut.get_performance_items()
  268. except TestCaseFailed:
  269. failed_cases.append(format_case_name(one_case))
  270. except Exception as e:
  271. handle_unexpected_exception(junit_test_case, e)
  272. failed_cases.append(format_case_name(one_case))
  273. finally:
  274. TinyFW.JunitReport.update_performance(performance_items)
  275. TinyFW.JunitReport.test_case_finish(junit_test_case)
  276. # close DUT when finish running all cases for one config
  277. env.close_dut(dut.name)
  278. class Handler(threading.Thread):
  279. WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!')
  280. SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!')
  281. FINISH_PATTERN = re.compile(r'1 Tests (\d) Failures (\d) Ignored')
  282. def __init__(self, dut, sent_signal_list, lock, parent_case_name, child_case_index, timeout):
  283. self.dut = dut
  284. self.sent_signal_list = sent_signal_list
  285. self.lock = lock
  286. self.parent_case_name = parent_case_name
  287. self.child_case_name = ''
  288. self.child_case_index = child_case_index + 1
  289. self.finish = False
  290. self.result = False
  291. self.output = ''
  292. self.fail_name = None
  293. self.timeout = timeout
  294. self.force_stop = threading.Event() # it show the running status
  295. reset_dut(self.dut) # reset the board to make it start from begining
  296. threading.Thread.__init__(self, name='{} Handler'.format(dut))
  297. def run(self):
  298. self.dut.start_capture_raw_data()
  299. def get_child_case_name(data):
  300. self.child_case_name = data[0]
  301. time.sleep(1)
  302. self.dut.write(str(self.child_case_index))
  303. def one_device_case_finish(result):
  304. """ one test finished, let expect loop break and log result """
  305. self.finish = True
  306. self.result = result
  307. self.output = '[{}]\n\n{}\n'.format(self.child_case_name,
  308. self.dut.stop_capture_raw_data())
  309. if not result:
  310. self.fail_name = self.child_case_name
  311. def device_wait_action(data):
  312. start_time = time.time()
  313. expected_signal = data[0].encode('utf-8')
  314. while 1:
  315. if time.time() > start_time + self.timeout:
  316. Utility.console_log('Timeout in device for function: %s' % self.child_case_name, color='orange')
  317. break
  318. with self.lock:
  319. for sent_signal in self.sent_signal_list:
  320. if expected_signal == sent_signal['name']:
  321. self.dut.write(sent_signal['parameter'])
  322. self.sent_signal_list.remove(sent_signal)
  323. break
  324. else:
  325. time.sleep(0.01)
  326. continue
  327. break
  328. def device_send_action(data):
  329. with self.lock:
  330. self.sent_signal_list.append({
  331. 'name': data[0].encode('utf-8'),
  332. 'parameter': '' if data[2] is None else data[2].encode('utf-8')
  333. # no parameter means we only write EOL to DUT
  334. })
  335. def handle_device_test_finish(data):
  336. """ test finished without reset """
  337. # in this scenario reset should not happen
  338. if int(data[1]):
  339. # case ignored
  340. Utility.console_log('Ignored: ' + self.child_case_name, color='orange')
  341. one_device_case_finish(not int(data[0]))
  342. try:
  343. time.sleep(1)
  344. self.dut.write("\"{}\"".format(self.parent_case_name))
  345. self.dut.expect('Running ' + self.parent_case_name + '...')
  346. except DUT.ExpectTimeout:
  347. Utility.console_log('No case detected!', color='orange')
  348. while not self.finish and not self.force_stop.isSet():
  349. try:
  350. self.dut.expect_any((re.compile('\(' + str(self.child_case_index) + '\)\s"(\w+)"'), # noqa: W605 - regex
  351. get_child_case_name),
  352. (self.WAIT_SIGNAL_PATTERN, device_wait_action), # wait signal pattern
  353. (self.SEND_SIGNAL_PATTERN, device_send_action), # send signal pattern
  354. (self.FINISH_PATTERN, handle_device_test_finish), # test finish pattern
  355. timeout=self.timeout)
  356. except DUT.ExpectTimeout:
  357. Utility.console_log('Timeout in expect (%s seconds)' % self.timeout, color='orange')
  358. one_device_case_finish(False)
  359. break
  360. def stop(self):
  361. self.force_stop.set()
  362. def get_case_info(one_case):
  363. parent_case = one_case['name']
  364. child_case_num = one_case['child case num']
  365. return parent_case, child_case_num
  366. def get_dut(duts, env, name, ut_config, app_bin=None):
  367. if name in duts:
  368. dut = duts[name]
  369. else:
  370. dut = env.get_dut(name, app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
  371. duts[name] = dut
  372. replace_app_bin(dut, 'unit-test-app', app_bin)
  373. dut.start_app() # download bin to board
  374. return dut
  375. def run_one_multiple_devices_case(duts, ut_config, env, one_case, app_bin, junit_test_case):
  376. lock = threading.RLock()
  377. threads = []
  378. send_signal_list = []
  379. result = True
  380. parent_case, case_num = get_case_info(one_case)
  381. for i in range(case_num):
  382. dut = get_dut(duts, env, 'dut%d' % i, ut_config, app_bin)
  383. threads.append(Handler(dut, send_signal_list, lock,
  384. parent_case, i, one_case['timeout']))
  385. for thread in threads:
  386. thread.setDaemon(True)
  387. thread.start()
  388. output = 'Multiple Device Failed\n'
  389. for thread in threads:
  390. thread.join()
  391. result = result and thread.result
  392. output += thread.output
  393. if not thread.result:
  394. [thd.stop() for thd in threads]
  395. if not result:
  396. junit_test_case.add_failure_info(output)
  397. # collect performances from DUTs
  398. performance_items = []
  399. for dut_name in duts:
  400. performance_items.extend(duts[dut_name].get_performance_items())
  401. TinyFW.JunitReport.update_performance(performance_items)
  402. return result
  403. @ttfw_idf.idf_unit_test(env_tag='UT_T2_1', junit_report_by_case=True)
  404. def run_multiple_devices_cases(env, extra_data):
  405. """
  406. extra_data can be two types of value
  407. 1. as dict:
  408. e.g.
  409. {"name": "gpio master/slave test example",
  410. "child case num": 2,
  411. "config": "release",
  412. "env_tag": "UT_T2_1"}
  413. 2. as list dict:
  414. e.g.
  415. [{"name": "gpio master/slave test example1",
  416. "child case num": 2,
  417. "config": "release",
  418. "env_tag": "UT_T2_1"},
  419. {"name": "gpio master/slave test example2",
  420. "child case num": 2,
  421. "config": "release",
  422. "env_tag": "UT_T2_1"}]
  423. """
  424. failed_cases = []
  425. case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
  426. duts = {}
  427. for ut_config in case_config:
  428. Utility.console_log('Running unit test for config: ' + ut_config, 'O')
  429. for one_case in case_config[ut_config]:
  430. log_test_case('multi-device test', one_case, ut_config, )
  431. result = False
  432. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  433. try:
  434. result = run_one_multiple_devices_case(duts, ut_config, env, one_case,
  435. one_case.get('app_bin'), junit_test_case)
  436. except TestCaseFailed:
  437. pass # result is False, this is handled by the finally block
  438. except Exception as e:
  439. handle_unexpected_exception(junit_test_case, e)
  440. finally:
  441. if result:
  442. Utility.console_log('Success: ' + format_case_name(one_case), color='green')
  443. else:
  444. failed_cases.append(format_case_name(one_case))
  445. Utility.console_log('Failed: ' + format_case_name(one_case), color='red')
  446. TinyFW.JunitReport.test_case_finish(junit_test_case)
  447. # close all DUTs when finish running all cases for one config
  448. for dut in duts:
  449. env.close_dut(dut)
  450. duts = {}
  451. def run_one_multiple_stage_case(dut, one_case, junit_test_case):
  452. reset_dut(dut)
  453. dut.start_capture_raw_data()
  454. exception_reset_list = []
  455. for test_stage in range(one_case['child case num']):
  456. # select multi stage test case name
  457. dut.write("\"{}\"".format(one_case['name']))
  458. dut.expect('Running ' + one_case['name'] + '...')
  459. # select test function for current stage
  460. dut.write(str(test_stage + 1))
  461. # we want to set this flag in callbacks (inner functions)
  462. # use list here so we can use append to set this flag
  463. stage_finish = list()
  464. def last_stage():
  465. return test_stage == one_case['child case num'] - 1
  466. def check_reset():
  467. if one_case['reset']:
  468. assert exception_reset_list # reboot but no exception/reset logged. should never happen
  469. result = False
  470. if len(one_case['reset']) == len(exception_reset_list):
  471. for i, exception in enumerate(exception_reset_list):
  472. if not reset_reason_matches(exception, one_case['reset'][i]):
  473. break
  474. else:
  475. result = True
  476. if not result:
  477. err_msg = 'Reset Check Failed: \r\n\tExpected: {}\r\n\tGet: {}'.format(one_case['reset'],
  478. exception_reset_list)
  479. Utility.console_log(err_msg, color='orange')
  480. junit_test_case.add_failure_info(err_msg)
  481. else:
  482. # we allow omit reset in multi stage cases
  483. result = True
  484. return result
  485. # expect callbacks
  486. def one_case_finish(result):
  487. """ one test finished, let expect loop break and log result """
  488. # handle test finish
  489. result = result and check_reset()
  490. output = dut.stop_capture_raw_data()
  491. if result:
  492. Utility.console_log('Success: ' + format_case_name(one_case), color='green')
  493. else:
  494. Utility.console_log('Failed: ' + format_case_name(one_case), color='red')
  495. junit_test_case.add_failure_info(output)
  496. raise TestCaseFailed(format_case_name(one_case))
  497. stage_finish.append('break')
  498. def handle_exception_reset(data):
  499. """
  500. just append data to exception list.
  501. exception list will be checked in ``handle_reset_finish``, once reset finished.
  502. """
  503. exception_reset_list.append(data[0])
  504. def handle_test_finish(data):
  505. """ test finished without reset """
  506. # in this scenario reset should not happen
  507. if int(data[1]):
  508. # case ignored
  509. Utility.console_log('Ignored: ' + format_case_name(one_case), color='orange')
  510. junit_test_case.add_skipped_info('ignored')
  511. # only passed in last stage will be regarded as real pass
  512. if last_stage():
  513. one_case_finish(not int(data[0]))
  514. else:
  515. Utility.console_log('test finished before enter last stage', color='orange')
  516. one_case_finish(False)
  517. def handle_next_stage(data):
  518. """ reboot finished. we goto next stage """
  519. if last_stage():
  520. # already last stage, should never goto next stage
  521. Utility.console_log("didn't finish at last stage", color='orange')
  522. one_case_finish(False)
  523. else:
  524. stage_finish.append('continue')
  525. while not stage_finish:
  526. try:
  527. timeout_value = one_case['timeout']
  528. dut.expect_any((RESET_PATTERN, handle_exception_reset),
  529. (EXCEPTION_PATTERN, handle_exception_reset),
  530. (ABORT_PATTERN, handle_exception_reset),
  531. (ASSERT_PATTERN, handle_exception_reset),
  532. (FINISH_PATTERN, handle_test_finish),
  533. (UT_APP_BOOT_UP_DONE, handle_next_stage),
  534. timeout=timeout_value)
  535. except DUT.ExpectTimeout:
  536. Utility.console_log('Timeout in expect (%s seconds)' % timeout_value, color='orange')
  537. one_case_finish(False)
  538. break
  539. if stage_finish[0] == 'break':
  540. # test breaks on current stage
  541. break
  542. @ttfw_idf.idf_unit_test(env_tag='UT_T1_1', junit_report_by_case=True)
  543. def run_multiple_stage_cases(env, extra_data):
  544. """
  545. extra_data can be 2 types of value
  546. 1. as dict: Mandatory keys: "name" and "child case num", optional keys: "reset" and others
  547. 3. as list of string or dict:
  548. [case1, case2, case3, {"name": "restart from PRO CPU", "child case num": 2}, ...]
  549. :param env: test env instance
  550. :param extra_data: the case name or case list or case dictionary
  551. :return: None
  552. """
  553. case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
  554. # we don't want stop on failed case (unless some special scenarios we can't handle)
  555. # this flag is used to log if any of the case failed during executing
  556. # Before exit test function this flag is used to log if the case fails
  557. failed_cases = []
  558. for ut_config in case_config:
  559. Utility.console_log('Running unit test for config: ' + ut_config, 'O')
  560. dut = env.get_dut('unit-test-app', app_path=UT_APP_PATH, app_config_name=ut_config, allow_dut_exception=True)
  561. if len(case_config[ut_config]) > 0:
  562. replace_app_bin(dut, 'unit-test-app', case_config[ut_config][0].get('app_bin'))
  563. dut.start_app()
  564. for one_case in case_config[ut_config]:
  565. log_test_case('multi-stage test', one_case, ut_config)
  566. performance_items = []
  567. junit_test_case = TinyFW.JunitReport.create_test_case(format_case_name(one_case))
  568. try:
  569. run_one_multiple_stage_case(dut, one_case, junit_test_case)
  570. performance_items = dut.get_performance_items()
  571. except TestCaseFailed:
  572. failed_cases.append(format_case_name(one_case))
  573. except Exception as e:
  574. handle_unexpected_exception(junit_test_case, e)
  575. failed_cases.append(format_case_name(one_case))
  576. finally:
  577. TinyFW.JunitReport.update_performance(performance_items)
  578. TinyFW.JunitReport.test_case_finish(junit_test_case)
  579. # close DUT when finish running all cases for one config
  580. env.close_dut(dut.name)
  581. def detect_update_unit_test_info(env, extra_data, app_bin):
  582. case_config = format_test_case_config(extra_data, env.default_dut_cls.TARGET)
  583. for ut_config in case_config:
  584. dut = env.get_dut('unit-test-app', app_path=UT_APP_PATH, app_config_name=ut_config)
  585. replace_app_bin(dut, 'unit-test-app', app_bin)
  586. dut.start_app()
  587. reset_dut(dut)
  588. # get the list of test cases
  589. dut.write('')
  590. dut.expect("Here's the test menu, pick your combo:", timeout=DEFAULT_TIMEOUT)
  591. def find_update_dic(name, _t, _timeout, child_case_num=None):
  592. for _case_data in extra_data:
  593. if _case_data['name'] == name:
  594. _case_data['type'] = _t
  595. if 'timeout' not in _case_data:
  596. _case_data['timeout'] = _timeout
  597. if child_case_num:
  598. _case_data['child case num'] = child_case_num
  599. try:
  600. while True:
  601. data = dut.expect(TEST_PATTERN, timeout=DEFAULT_TIMEOUT)
  602. test_case_name = data[1]
  603. m = re.search(r'\[timeout=(\d+)\]', data[2])
  604. if m:
  605. timeout = int(m.group(1))
  606. else:
  607. timeout = 30
  608. m = re.search(r'\[multi_stage\]', data[2])
  609. if m:
  610. test_case_type = MULTI_STAGE_ID
  611. else:
  612. m = re.search(r'\[multi_device\]', data[2])
  613. if m:
  614. test_case_type = MULTI_DEVICE_ID
  615. else:
  616. test_case_type = SIMPLE_TEST_ID
  617. find_update_dic(test_case_name, test_case_type, timeout)
  618. if data[3] and re.search(END_LIST_STR, data[3]):
  619. break
  620. continue
  621. # find the last submenu item
  622. data = dut.expect(TEST_SUBMENU_PATTERN, timeout=DEFAULT_TIMEOUT)
  623. find_update_dic(test_case_name, test_case_type, timeout, child_case_num=int(data[0]))
  624. if data[1] and re.search(END_LIST_STR, data[1]):
  625. break
  626. # check if the unit test case names are correct, i.e. they could be found in the device
  627. for _dic in extra_data:
  628. if 'type' not in _dic:
  629. raise ValueError("Unit test \"{}\" doesn't exist in the flashed device!".format(_dic.get('name')))
  630. except DUT.ExpectTimeout:
  631. Utility.console_log('Timeout during getting the test list', color='red')
  632. finally:
  633. dut.close()
  634. # These options are the same for all configs, therefore there is no need to continue
  635. break
  636. if __name__ == '__main__':
  637. parser = argparse.ArgumentParser()
  638. parser.add_argument(
  639. '--repeat', '-r',
  640. help='Number of repetitions for the test(s). Default is 1.',
  641. type=int,
  642. default=1
  643. )
  644. parser.add_argument('--env_config_file', '-e',
  645. help='test env config file',
  646. default=None)
  647. parser.add_argument('--app_bin', '-b',
  648. help='application binary file for flashing the chip',
  649. default=None)
  650. parser.add_argument('test',
  651. help='Comma separated list of <option>:<argument> where option can be "name" (default), '
  652. '"child case num", "config", "timeout".',
  653. nargs='+')
  654. args = parser.parse_args()
  655. list_of_dicts = []
  656. for test in args.test:
  657. test_args = test.split(r',')
  658. test_dict = dict()
  659. for test_item in test_args:
  660. if len(test_item) == 0:
  661. continue
  662. pair = test_item.split(r':', 1)
  663. if len(pair) == 1 or pair[0] == 'name':
  664. test_dict['name'] = pair[0]
  665. elif len(pair) == 2:
  666. if pair[0] == 'timeout' or pair[0] == 'child case num':
  667. test_dict[pair[0]] = int(pair[1])
  668. else:
  669. test_dict[pair[0]] = pair[1]
  670. else:
  671. raise ValueError('Error in argument item {} of {}'.format(test_item, test))
  672. test_dict['app_bin'] = args.app_bin
  673. list_of_dicts.append(test_dict)
  674. TinyFW.set_default_config(env_config_file=args.env_config_file)
  675. env_config = TinyFW.get_default_config()
  676. env_config['app'] = ttfw_idf.UT
  677. env_config['dut'] = ttfw_idf.IDFDUT
  678. env_config['test_suite_name'] = 'unit_test_parsing'
  679. test_env = Env.Env(**env_config)
  680. detect_update_unit_test_info(test_env, extra_data=list_of_dicts, app_bin=args.app_bin)
  681. for index in range(1, args.repeat + 1):
  682. if args.repeat > 1:
  683. Utility.console_log('Repetition {}'.format(index), color='green')
  684. for dic in list_of_dicts:
  685. t = dic.get('type', SIMPLE_TEST_ID)
  686. if t == SIMPLE_TEST_ID:
  687. run_unit_test_cases(extra_data=dic)
  688. elif t == MULTI_STAGE_ID:
  689. run_multiple_stage_cases(extra_data=dic)
  690. elif t == MULTI_DEVICE_ID:
  691. run_multiple_devices_cases(extra_data=dic)
  692. else:
  693. raise ValueError('Unknown type {} of {}'.format(t, dic.get('name')))