IDFApp.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. # SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. """ IDF Test Applications """
  4. import hashlib
  5. import json
  6. import os
  7. import re
  8. import subprocess
  9. import sys
  10. from abc import abstractmethod
  11. from tiny_test_fw import App
  12. from .IDFAssignTest import ComponentUTGroup, ExampleGroup, IDFCaseGroup, TestAppsGroup, UnitTestGroup
  13. try:
  14. import gitlab_api
  15. except ImportError:
  16. gitlab_api = None
  17. try:
  18. from typing import Any, Dict, List, Optional, Tuple, Type # noqa: F401
  19. except ImportError:
  20. pass
  21. def parse_encrypted_flag(args, offs, binary): # type: (Dict, str, str) -> Any
  22. # Find partition entries (e.g. the entries with an offset and a file)
  23. for _, entry in args.items():
  24. # If the current entry is a partition, we have to check whether it is
  25. # the one we are looking for or not
  26. try:
  27. if (entry['offset'], entry['file']) == (offs, binary):
  28. return entry['encrypted'] == 'true'
  29. except (TypeError, KeyError):
  30. # TypeError occurs if the entry is a list, which is possible in JSON
  31. # data structure.
  32. # KeyError occurs if the entry doesn't have "encrypted" field.
  33. continue
  34. # The entry was not found, return None. The caller will have to check
  35. # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro
  36. return None
  37. def parse_flash_settings(path, default_encryption=False): # type: (str, bool) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]], Dict, Any]
  38. file_name = os.path.basename(path)
  39. # For compatibility reasons, this list contains all the files to be
  40. # flashed
  41. flash_files = []
  42. # The following list only contains the files that need encryption
  43. encrypt_files = []
  44. if file_name == 'flasher_args.json':
  45. # CMake version using build metadata file
  46. with open(path, 'r') as f:
  47. args = json.load(f)
  48. for (offs, binary) in args['flash_files'].items():
  49. if offs:
  50. flash_files.append((offs, binary))
  51. encrypted = parse_encrypted_flag(args, offs, binary)
  52. # default_encryption should be taken into account if and only if
  53. # encrypted flag is not provided in the JSON file.
  54. if (encrypted is None and default_encryption) or encrypted:
  55. encrypt_files.append((offs, binary))
  56. flash_settings = args['flash_settings']
  57. app_name = os.path.splitext(args['app']['file'])[0]
  58. else:
  59. # GNU Make version uses download.config arguments file
  60. with open(path, 'r') as f:
  61. args = f.readlines()[-1].split(' ')
  62. flash_settings = {}
  63. for idx in range(0, len(args), 2): # process arguments in pairs
  64. if args[idx].startswith('--'):
  65. # strip the -- from the command line argument
  66. flash_settings[args[idx][2:]] = args[idx + 1]
  67. else:
  68. # offs, filename
  69. flash_files.append((args[idx], args[idx + 1]))
  70. # Parameter default_encryption tells us if the files need encryption
  71. if default_encryption:
  72. encrypt_files = flash_files
  73. # we can only guess app name in download.config.
  74. for p in flash_files:
  75. if not os.path.dirname(p[1]) and 'partition' not in p[1]:
  76. # app bin usually in the same dir with download.config and it's not partition table
  77. app_name = os.path.splitext(p[1])[0]
  78. break
  79. else:
  80. app_name = None
  81. return flash_files, encrypt_files, flash_settings, app_name
  82. class Artifacts(object):
  83. def __init__(self, dest_root_path, artifact_index_file, app_path, config_name, target):
  84. # type: (str, str, str, str, str) -> None
  85. assert gitlab_api
  86. # at least one of app_path or config_name is not None. otherwise we can't match artifact
  87. assert app_path or config_name
  88. assert os.path.exists(artifact_index_file)
  89. self.gitlab_inst = gitlab_api.Gitlab(os.getenv('CI_PROJECT_ID'))
  90. self.dest_root_path = dest_root_path
  91. with open(artifact_index_file, 'r') as f:
  92. artifact_index = json.load(f)
  93. self.artifact_info = self._find_artifact(artifact_index, app_path, config_name, target)
  94. @staticmethod
  95. def _find_artifact(artifact_index, app_path, config_name, target): # type: ignore
  96. for artifact_info in artifact_index:
  97. match_result = True
  98. if app_path:
  99. # We use endswith here to avoid issue like:
  100. # examples_protocols_mqtt_ws but return a examples_protocols_mqtt_wss failure
  101. match_result = artifact_info['app_dir'].endswith(app_path)
  102. if config_name:
  103. match_result = match_result and config_name == artifact_info['config']
  104. if target:
  105. match_result = match_result and target == artifact_info['target']
  106. if match_result:
  107. ret = artifact_info
  108. break
  109. else:
  110. ret = None
  111. return ret
  112. def _get_app_base_path(self): # type: () -> Any
  113. if self.artifact_info:
  114. return os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir'])
  115. else:
  116. return None
  117. def _get_flash_arg_file(self, base_path, job_id): # type: (str, str) -> str
  118. if self.artifact_info['build_system'] == 'cmake':
  119. flash_arg_file = os.path.join(base_path, 'flasher_args.json')
  120. else:
  121. flash_arg_file = os.path.join(base_path, 'download.config')
  122. self.gitlab_inst.download_artifact(job_id, [flash_arg_file], self.dest_root_path)
  123. return flash_arg_file
  124. def _download_binary_files(self, base_path, job_id, flash_arg_file): # type: (str, str, str) -> None
  125. # Let's ignore the second value returned (encrypt_files) as these
  126. # files also appear in the first list
  127. flash_files, _, _, app_name = parse_flash_settings(os.path.join(self.dest_root_path, flash_arg_file))
  128. artifact_files = [os.path.join(base_path, p[1]) for p in flash_files]
  129. artifact_files.append(os.path.join(base_path, app_name + '.elf'))
  130. bootloader_path = os.path.join(base_path, 'bootloader', 'bootloader.bin')
  131. if bootloader_path not in artifact_files:
  132. artifact_files.append(bootloader_path)
  133. self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path)
  134. def _download_sdkconfig_file(self, base_path, job_id): # type: (str, str) -> None
  135. self.gitlab_inst.download_artifact(job_id, [os.path.join(base_path, 'sdkconfig')],
  136. self.dest_root_path)
  137. def download_artifacts(self): # type: () -> Any
  138. if not self.artifact_info:
  139. return None
  140. base_path = self._get_app_base_path()
  141. job_id = self.artifact_info['ci_job_id']
  142. # 1. download flash args file
  143. flash_arg_file = self._get_flash_arg_file(base_path, job_id)
  144. # 2. download all binary files
  145. self._download_binary_files(base_path, job_id, flash_arg_file)
  146. # 3. download sdkconfig file
  147. self._download_sdkconfig_file(base_path, job_id)
  148. return base_path
  149. def download_artifact_files(self, file_names): # type: (List[str]) -> Any
  150. if self.artifact_info:
  151. base_path = os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir'])
  152. job_id = self.artifact_info['ci_job_id']
  153. # download all binary files
  154. artifact_files = [os.path.join(base_path, fn) for fn in file_names]
  155. self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path)
  156. # download sdkconfig file
  157. self.gitlab_inst.download_artifact(job_id, [os.path.join(base_path, 'sdkconfig')],
  158. self.dest_root_path)
  159. else:
  160. base_path = None
  161. return base_path
  162. class UnitTestArtifacts(Artifacts):
  163. BUILDS_DIR_RE = re.compile(r'^builds/')
  164. def _get_app_base_path(self): # type: () -> Any
  165. if self.artifact_info:
  166. output_dir = self.BUILDS_DIR_RE.sub('output/', self.artifact_info['build_dir'])
  167. return os.path.join(self.artifact_info['app_dir'], output_dir)
  168. else:
  169. return None
  170. def _download_sdkconfig_file(self, base_path, job_id): # type: (str, str) -> None
  171. self.gitlab_inst.download_artifact(job_id, [os.path.join(base_path, 'sdkconfig')], self.dest_root_path)
  172. class IDFApp(App.BaseApp):
  173. """
  174. Implements common esp-idf application behavior.
  175. idf applications should inherent from this class and overwrite method get_binary_path.
  176. """
  177. IDF_DOWNLOAD_CONFIG_FILE = 'download.config'
  178. IDF_FLASH_ARGS_FILE = 'flasher_args.json'
  179. def __init__(self, app_path, config_name=None, target=None, case_group=IDFCaseGroup, artifact_cls=Artifacts): # type: ignore
  180. super(IDFApp, self).__init__(app_path)
  181. self.app_path = app_path # type: (str)
  182. self.config_name = config_name # type: (str)
  183. self.target = target # type: (str)
  184. self.idf_path = self.get_sdk_path() # type: (str)
  185. self.case_group = case_group
  186. self.artifact_cls = artifact_cls
  187. self.binary_path = self.get_binary_path()
  188. self.elf_file = self._get_elf_file_path()
  189. self._elf_file_sha256 = None # type: (Optional[str])
  190. assert os.path.exists(self.binary_path)
  191. if self.IDF_DOWNLOAD_CONFIG_FILE not in os.listdir(self.binary_path):
  192. if self.IDF_FLASH_ARGS_FILE not in os.listdir(self.binary_path):
  193. msg = ('Neither {} nor {} exists. '
  194. "Try to run 'make print_flash_cmd | tail -n 1 > {}/{}' "
  195. "or 'idf.py build' "
  196. 'for resolving the issue.'
  197. '').format(self.IDF_DOWNLOAD_CONFIG_FILE, self.IDF_FLASH_ARGS_FILE,
  198. self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
  199. raise AssertionError(msg)
  200. # In order to keep backward compatibility, flash_files is unchanged.
  201. # However, we now have a new attribute encrypt_files.
  202. self.flash_files, self.encrypt_files, self.flash_settings = self._parse_flash_download_config()
  203. self.partition_table = self._parse_partition_table()
  204. def __str__(self): # type: () -> str
  205. parts = ['app<{}>'.format(self.app_path)]
  206. if self.config_name:
  207. parts.append('config<{}>'.format(self.config_name))
  208. if self.target:
  209. parts.append('target<{}>'.format(self.target))
  210. return ' '.join(parts)
  211. @classmethod
  212. def get_sdk_path(cls): # type: () -> str
  213. idf_path = os.getenv('IDF_PATH')
  214. assert idf_path
  215. assert os.path.exists(idf_path)
  216. return idf_path
  217. def _get_sdkconfig_paths(self): # type: () -> List[str]
  218. """
  219. returns list of possible paths where sdkconfig could be found
  220. Note: could be overwritten by a derived class to provide other locations or order
  221. """
  222. return [os.path.join(self.binary_path, 'sdkconfig'), os.path.join(self.binary_path, '..', 'sdkconfig')]
  223. def get_sdkconfig(self): # type: () -> Dict
  224. """
  225. reads sdkconfig and returns a dictionary with all configured variables
  226. :raise: AssertionError: if sdkconfig file does not exist in defined paths
  227. """
  228. d = {}
  229. sdkconfig_file = None
  230. for i in self._get_sdkconfig_paths():
  231. if os.path.exists(i):
  232. sdkconfig_file = i
  233. break
  234. assert sdkconfig_file is not None
  235. with open(sdkconfig_file) as f:
  236. for line in f:
  237. configs = line.split('=')
  238. if len(configs) == 2:
  239. d[configs[0]] = configs[1].rstrip()
  240. return d
  241. def get_sdkconfig_config_value(self, config_key): # type: (str) -> Any
  242. sdkconfig_dict = self.get_sdkconfig()
  243. value = None
  244. if (config_key in sdkconfig_dict):
  245. value = sdkconfig_dict[config_key]
  246. return value
  247. @abstractmethod
  248. def _try_get_binary_from_local_fs(self): # type: () -> Optional[str]
  249. pass
  250. def get_binary_path(self): # type: () -> str
  251. path = self._try_get_binary_from_local_fs()
  252. if path:
  253. return path
  254. artifacts = self.artifact_cls(self.idf_path,
  255. self.case_group.get_artifact_index_file(),
  256. self.app_path, self.config_name, self.target)
  257. if isinstance(self, LoadableElfTestApp):
  258. assert self.app_files
  259. path = artifacts.download_artifact_files(self.app_files)
  260. else:
  261. path = artifacts.download_artifacts()
  262. if path:
  263. return os.path.join(self.idf_path, path)
  264. else:
  265. raise OSError('Failed to get binary for {}'.format(self))
  266. def _get_elf_file_path(self): # type: () -> str
  267. ret = ''
  268. file_names = os.listdir(self.binary_path)
  269. for fn in file_names:
  270. if os.path.splitext(fn)[1] == '.elf':
  271. ret = os.path.join(self.binary_path, fn)
  272. return ret
  273. def _int_offs_abs_paths(self, files_list): # type: (List[tuple[str, str]]) -> List[Tuple[int, str]]
  274. return [(int(offs, 0),
  275. os.path.join(self.binary_path, file_path.strip()))
  276. for (offs, file_path) in files_list]
  277. def _parse_flash_download_config(self): # type: () -> Tuple[List[tuple[int, str]], List[tuple[int, str]], Dict]
  278. """
  279. Parse flash download config from build metadata files
  280. Sets self.flash_files, self.flash_settings
  281. (Called from constructor)
  282. Returns (flash_files, encrypt_files, flash_settings)
  283. """
  284. if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path):
  285. # CMake version using build metadata file
  286. path = os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE)
  287. else:
  288. # GNU Make version uses download.config arguments file
  289. path = os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
  290. # If the JSON doesn't find the encrypted flag for our files, provide
  291. # a default encrpytion flag: the macro
  292. # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT
  293. sdkconfig_dict = self.get_sdkconfig()
  294. default_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict
  295. flash_files, encrypt_files, flash_settings, _ = parse_flash_settings(path, default_encryption)
  296. # Flash setting "encrypt" only and only if all the files to flash
  297. # must be encrypted. Else, this parameter should be False.
  298. # All files must be encrypted is both file lists are the same
  299. flash_settings['encrypt'] = sorted(flash_files) == sorted(encrypt_files)
  300. return self._int_offs_abs_paths(flash_files), self._int_offs_abs_paths(encrypt_files), flash_settings
  301. def _parse_partition_table(self): # type: ignore
  302. """
  303. Parse partition table contents based on app binaries
  304. Returns partition_table data
  305. (Called from constructor)
  306. """
  307. partition_tool = os.path.join(self.idf_path,
  308. 'components',
  309. 'partition_table',
  310. 'gen_esp32part.py')
  311. assert os.path.exists(partition_tool)
  312. errors = []
  313. # self.flash_files is sorted based on offset in order to have a consistent result with different versions of
  314. # Python
  315. for (_, path) in sorted(self.flash_files, key=lambda elem: elem[0]):
  316. if 'partition' in os.path.split(path)[1]:
  317. partition_file = os.path.join(self.binary_path, path)
  318. process = subprocess.Popen([sys.executable, partition_tool, partition_file],
  319. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  320. (raw_data, raw_error) = process.communicate()
  321. if isinstance(raw_error, bytes):
  322. raw_error = raw_error.decode()
  323. if 'Traceback' in raw_error:
  324. # Some exception occurred. It is possible that we've tried the wrong binary file.
  325. errors.append((path, raw_error))
  326. continue
  327. if isinstance(raw_data, bytes):
  328. raw_data = raw_data.decode()
  329. break
  330. else:
  331. traceback_msg = os.linesep.join(['{} {}:{}{}'.format(partition_tool,
  332. p,
  333. os.linesep,
  334. msg) for p, msg in errors])
  335. raise ValueError('No partition table found for IDF binary path: {}{}{}'.format(self.binary_path,
  336. os.linesep,
  337. traceback_msg))
  338. partition_table = dict()
  339. for line in raw_data.splitlines():
  340. if line[0] != '#':
  341. try:
  342. _name, _type, _subtype, _offset, _size, _flags = line.split(',')
  343. if _size[-1] == 'K':
  344. _size = int(_size[:-1]) * 1024
  345. elif _size[-1] == 'M':
  346. _size = int(_size[:-1]) * 1024 * 1024
  347. else:
  348. _size = int(_size)
  349. _offset = int(_offset, 0)
  350. except ValueError:
  351. continue
  352. partition_table[_name] = {
  353. 'type': _type,
  354. 'subtype': _subtype,
  355. 'offset': _offset,
  356. 'size': _size,
  357. 'flags': _flags
  358. }
  359. return partition_table
  360. def get_elf_sha256(self): # type: () -> Optional[str]
  361. if self._elf_file_sha256:
  362. return self._elf_file_sha256
  363. sha256 = hashlib.sha256()
  364. with open(self.elf_file, 'rb') as f:
  365. sha256.update(f.read())
  366. self._elf_file_sha256 = sha256.hexdigest()
  367. return self._elf_file_sha256
  368. class Example(IDFApp):
  369. def __init__(self, app_path, config_name='default', target='esp32', case_group=ExampleGroup, artifacts_cls=Artifacts):
  370. # type: (str, str, str, Type[ExampleGroup], Type[Artifacts]) -> None
  371. if not config_name:
  372. config_name = 'default'
  373. if not target:
  374. target = 'esp32'
  375. super(Example, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  376. def _try_get_binary_from_local_fs(self): # type: () -> Optional[str]
  377. # build folder of example path
  378. path = os.path.join(self.idf_path, self.app_path, 'build')
  379. if os.path.exists(path):
  380. return path
  381. # new style build dir
  382. path = os.path.join(self.idf_path, self.app_path, f'build_{self.target}_{self.config_name}')
  383. if os.path.exists(path):
  384. return path
  385. # Search for CI build folders.
  386. # Path format: $IDF_PATH/<app_dir>/build_<target>_<config>
  387. build_dir = f'build_{self.target}_{self.config_name}'
  388. example_path = os.path.join(self.idf_path, self.app_path, build_dir)
  389. if os.path.exists(example_path):
  390. return path
  391. return None
  392. class UT(IDFApp):
  393. def __init__(self, app_path, config_name='default', target='esp32', case_group=UnitTestGroup, artifacts_cls=UnitTestArtifacts):
  394. # type: (str, str, str, Type[UnitTestGroup], Type[UnitTestArtifacts]) -> None
  395. if not config_name:
  396. config_name = 'default'
  397. if not target:
  398. target = 'esp32'
  399. super(UT, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  400. def _try_get_binary_from_local_fs(self): # type: () -> Optional[str]
  401. path = os.path.join(self.idf_path, self.app_path, 'build')
  402. if os.path.exists(path):
  403. return path
  404. # first try to get from build folder of unit-test-app
  405. path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'build')
  406. if os.path.exists(path):
  407. # found, use bin in build path
  408. return path
  409. # ``build_unit_test.sh`` will copy binary to output folder
  410. path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'output', self.target, self.config_name)
  411. if os.path.exists(path):
  412. return path
  413. return None
  414. class TestApp(Example):
  415. def __init__(self, app_path, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts):
  416. # type: (str, str, str, Type[TestAppsGroup], Type[Artifacts]) -> None
  417. super(TestApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  418. class ComponentUTApp(TestApp):
  419. def __init__(self, app_path, config_name='default', target='esp32', case_group=ComponentUTGroup, artifacts_cls=Artifacts):
  420. # type: (str, str, str, Type[ComponentUTGroup], Type[Artifacts]) -> None
  421. super(ComponentUTApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  422. class LoadableElfTestApp(TestApp):
  423. def __init__(self, app_path, app_files, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts):
  424. # type: (str, List[str], str, str, Type[TestAppsGroup], Type[Artifacts]) -> None
  425. # add arg `app_files` for loadable elf test_app.
  426. # Such examples only build elf files, so it doesn't generate flasher_args.json.
  427. # So we can't get app files from config file. Test case should pass it to application.
  428. super(IDFApp, self).__init__(app_path)
  429. self.app_path = app_path
  430. self.app_files = app_files
  431. self.config_name = config_name or 'default'
  432. self.target = target or 'esp32'
  433. self.idf_path = self.get_sdk_path()
  434. self.case_group = case_group
  435. self.artifact_cls = artifacts_cls
  436. self.binary_path = self.get_binary_path()
  437. self.elf_file = self._get_elf_file_path()
  438. assert os.path.exists(self.binary_path)
  439. class SSC(IDFApp):
  440. def get_binary_path(self): # type: () -> str
  441. # TODO: to implement SSC get binary path
  442. return self.app_path
  443. class AT(IDFApp):
  444. def get_binary_path(self): # type: () -> str
  445. # TODO: to implement AT get binary path
  446. return self.app_path