IDFApp.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. # Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http:#www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """ IDF Test Applications """
  15. import hashlib
  16. import json
  17. import os
  18. import re
  19. import subprocess
  20. import sys
  21. from abc import abstractmethod
  22. from tiny_test_fw import App
  23. from .IDFAssignTest import ComponentUTGroup, ExampleGroup, IDFCaseGroup, TestAppsGroup, UnitTestGroup
  24. try:
  25. import gitlab_api
  26. except ImportError:
  27. gitlab_api = None
  28. try:
  29. from typing import Any, Dict, List, Optional, Tuple, Type # noqa: F401
  30. except ImportError:
  31. pass
  32. def parse_encrypted_flag(args, offs, binary): # type: (Dict, str, str) -> Any
  33. # Find partition entries (e.g. the entries with an offset and a file)
  34. for _, entry in args.items():
  35. # If the current entry is a partition, we have to check whether it is
  36. # the one we are looking for or not
  37. try:
  38. if (entry['offset'], entry['file']) == (offs, binary):
  39. return entry['encrypted'] == 'true'
  40. except (TypeError, KeyError):
  41. # TypeError occurs if the entry is a list, which is possible in JSON
  42. # data structure.
  43. # KeyError occurs if the entry doesn't have "encrypted" field.
  44. continue
  45. # The entry was not found, return None. The caller will have to check
  46. # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro
  47. return None
  48. def parse_flash_settings(path, default_encryption=False): # type: (str, bool) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]], Dict, Any]
  49. file_name = os.path.basename(path)
  50. # For compatibility reasons, this list contains all the files to be
  51. # flashed
  52. flash_files = []
  53. # The following list only contains the files that need encryption
  54. encrypt_files = []
  55. if file_name == 'flasher_args.json':
  56. # CMake version using build metadata file
  57. with open(path, 'r') as f:
  58. args = json.load(f)
  59. for (offs, binary) in args['flash_files'].items():
  60. if offs:
  61. flash_files.append((offs, binary))
  62. encrypted = parse_encrypted_flag(args, offs, binary)
  63. # default_encryption should be taken into account if and only if
  64. # encrypted flag is not provided in the JSON file.
  65. if (encrypted is None and default_encryption) or encrypted:
  66. encrypt_files.append((offs, binary))
  67. flash_settings = args['flash_settings']
  68. app_name = os.path.splitext(args['app']['file'])[0]
  69. else:
  70. # GNU Make version uses download.config arguments file
  71. with open(path, 'r') as f:
  72. args = f.readlines()[-1].split(' ')
  73. flash_settings = {}
  74. for idx in range(0, len(args), 2): # process arguments in pairs
  75. if args[idx].startswith('--'):
  76. # strip the -- from the command line argument
  77. flash_settings[args[idx][2:]] = args[idx + 1]
  78. else:
  79. # offs, filename
  80. flash_files.append((args[idx], args[idx + 1]))
  81. # Parameter default_encryption tells us if the files need encryption
  82. if default_encryption:
  83. encrypt_files = flash_files
  84. # we can only guess app name in download.config.
  85. for p in flash_files:
  86. if not os.path.dirname(p[1]) and 'partition' not in p[1]:
  87. # app bin usually in the same dir with download.config and it's not partition table
  88. app_name = os.path.splitext(p[1])[0]
  89. break
  90. else:
  91. app_name = None
  92. return flash_files, encrypt_files, flash_settings, app_name
  93. class Artifacts(object):
  94. def __init__(self, dest_root_path, artifact_index_file, app_path, config_name, target):
  95. # type: (str, str, str, str, str) -> None
  96. assert gitlab_api
  97. # at least one of app_path or config_name is not None. otherwise we can't match artifact
  98. assert app_path or config_name
  99. assert os.path.exists(artifact_index_file)
  100. self.gitlab_inst = gitlab_api.Gitlab(os.getenv('CI_PROJECT_ID'))
  101. self.dest_root_path = dest_root_path
  102. with open(artifact_index_file, 'r') as f:
  103. artifact_index = json.load(f)
  104. self.artifact_info = self._find_artifact(artifact_index, app_path, config_name, target)
  105. @staticmethod
  106. def _find_artifact(artifact_index, app_path, config_name, target): # type: ignore
  107. for artifact_info in artifact_index:
  108. match_result = True
  109. if app_path:
  110. # We use endswith here to avoid issue like:
  111. # examples_protocols_mqtt_ws but return a examples_protocols_mqtt_wss failure
  112. match_result = artifact_info['app_dir'].endswith(app_path)
  113. if config_name:
  114. match_result = match_result and config_name == artifact_info['config']
  115. if target:
  116. match_result = match_result and target == artifact_info['target']
  117. if match_result:
  118. ret = artifact_info
  119. break
  120. else:
  121. ret = None
  122. return ret
  123. def _get_app_base_path(self): # type: () -> Any
  124. if self.artifact_info:
  125. return os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir'])
  126. else:
  127. return None
  128. def _get_flash_arg_file(self, base_path, job_id): # type: (str, str) -> str
  129. if self.artifact_info['build_system'] == 'cmake':
  130. flash_arg_file = os.path.join(base_path, 'flasher_args.json')
  131. else:
  132. flash_arg_file = os.path.join(base_path, 'download.config')
  133. self.gitlab_inst.download_artifact(job_id, [flash_arg_file], self.dest_root_path)
  134. return flash_arg_file
  135. def _download_binary_files(self, base_path, job_id, flash_arg_file): # type: (str, str, str) -> None
  136. # Let's ignore the second value returned (encrypt_files) as these
  137. # files also appear in the first list
  138. flash_files, _, _, app_name = parse_flash_settings(os.path.join(self.dest_root_path, flash_arg_file))
  139. artifact_files = [os.path.join(base_path, p[1]) for p in flash_files]
  140. artifact_files.append(os.path.join(base_path, app_name + '.elf'))
  141. bootloader_path = os.path.join(base_path, 'bootloader', 'bootloader.bin')
  142. if bootloader_path not in artifact_files:
  143. artifact_files.append(bootloader_path)
  144. self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path)
  145. def _download_sdkconfig_file(self, base_path, job_id): # type: (str, str) -> None
  146. self.gitlab_inst.download_artifact(job_id, [os.path.join(os.path.dirname(base_path), 'sdkconfig')],
  147. self.dest_root_path)
  148. def download_artifacts(self): # type: () -> Any
  149. if not self.artifact_info:
  150. return None
  151. base_path = self._get_app_base_path()
  152. job_id = self.artifact_info['ci_job_id']
  153. # 1. download flash args file
  154. flash_arg_file = self._get_flash_arg_file(base_path, job_id)
  155. # 2. download all binary files
  156. self._download_binary_files(base_path, job_id, flash_arg_file)
  157. # 3. download sdkconfig file
  158. self._download_sdkconfig_file(base_path, job_id)
  159. return base_path
  160. def download_artifact_files(self, file_names): # type: (List[str]) -> Any
  161. if self.artifact_info:
  162. base_path = os.path.join(self.artifact_info['work_dir'], self.artifact_info['build_dir'])
  163. job_id = self.artifact_info['ci_job_id']
  164. # download all binary files
  165. artifact_files = [os.path.join(base_path, fn) for fn in file_names]
  166. self.gitlab_inst.download_artifact(job_id, artifact_files, self.dest_root_path)
  167. # download sdkconfig file
  168. self.gitlab_inst.download_artifact(job_id, [os.path.join(os.path.dirname(base_path), 'sdkconfig')],
  169. self.dest_root_path)
  170. else:
  171. base_path = None
  172. return base_path
  173. class UnitTestArtifacts(Artifacts):
  174. BUILDS_DIR_RE = re.compile(r'^builds/')
  175. def _get_app_base_path(self): # type: () -> Any
  176. if self.artifact_info:
  177. output_dir = self.BUILDS_DIR_RE.sub('output/', self.artifact_info['build_dir'])
  178. return os.path.join(self.artifact_info['app_dir'], output_dir)
  179. else:
  180. return None
  181. def _download_sdkconfig_file(self, base_path, job_id): # type: (str, str) -> None
  182. self.gitlab_inst.download_artifact(job_id, [os.path.join(base_path, 'sdkconfig')], self.dest_root_path)
  183. class IDFApp(App.BaseApp):
  184. """
  185. Implements common esp-idf application behavior.
  186. idf applications should inherent from this class and overwrite method get_binary_path.
  187. """
  188. IDF_DOWNLOAD_CONFIG_FILE = 'download.config'
  189. IDF_FLASH_ARGS_FILE = 'flasher_args.json'
  190. def __init__(self, app_path, config_name=None, target=None, case_group=IDFCaseGroup, artifact_cls=Artifacts): # type: ignore
  191. super(IDFApp, self).__init__(app_path)
  192. self.app_path = app_path # type: (str)
  193. self.config_name = config_name # type: (str)
  194. self.target = target # type: (str)
  195. self.idf_path = self.get_sdk_path() # type: (str)
  196. self.case_group = case_group
  197. self.artifact_cls = artifact_cls
  198. self.binary_path = self.get_binary_path()
  199. self.elf_file = self._get_elf_file_path()
  200. self._elf_file_sha256 = None # type: (Optional[str])
  201. assert os.path.exists(self.binary_path)
  202. if self.IDF_DOWNLOAD_CONFIG_FILE not in os.listdir(self.binary_path):
  203. if self.IDF_FLASH_ARGS_FILE not in os.listdir(self.binary_path):
  204. msg = ('Neither {} nor {} exists. '
  205. "Try to run 'make print_flash_cmd | tail -n 1 > {}/{}' "
  206. "or 'idf.py build' "
  207. 'for resolving the issue.'
  208. '').format(self.IDF_DOWNLOAD_CONFIG_FILE, self.IDF_FLASH_ARGS_FILE,
  209. self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
  210. raise AssertionError(msg)
  211. # In order to keep backward compatibility, flash_files is unchanged.
  212. # However, we now have a new attribute encrypt_files.
  213. self.flash_files, self.encrypt_files, self.flash_settings = self._parse_flash_download_config()
  214. self.partition_table = self._parse_partition_table()
  215. def __str__(self): # type: () -> str
  216. parts = ['app<{}>'.format(self.app_path)]
  217. if self.config_name:
  218. parts.append('config<{}>'.format(self.config_name))
  219. if self.target:
  220. parts.append('target<{}>'.format(self.target))
  221. return ' '.join(parts)
  222. @classmethod
  223. def get_sdk_path(cls): # type: () -> str
  224. idf_path = os.getenv('IDF_PATH')
  225. assert idf_path
  226. assert os.path.exists(idf_path)
  227. return idf_path
  228. def _get_sdkconfig_paths(self): # type: () -> List[str]
  229. """
  230. returns list of possible paths where sdkconfig could be found
  231. Note: could be overwritten by a derived class to provide other locations or order
  232. """
  233. return [os.path.join(self.binary_path, 'sdkconfig'), os.path.join(self.binary_path, '..', 'sdkconfig')]
  234. def get_sdkconfig(self): # type: () -> Dict
  235. """
  236. reads sdkconfig and returns a dictionary with all configured variables
  237. :raise: AssertionError: if sdkconfig file does not exist in defined paths
  238. """
  239. d = {}
  240. sdkconfig_file = None
  241. for i in self._get_sdkconfig_paths():
  242. if os.path.exists(i):
  243. sdkconfig_file = i
  244. break
  245. assert sdkconfig_file is not None
  246. with open(sdkconfig_file) as f:
  247. for line in f:
  248. configs = line.split('=')
  249. if len(configs) == 2:
  250. d[configs[0]] = configs[1].rstrip()
  251. return d
  252. def get_sdkconfig_config_value(self, config_key): # type: (str) -> Any
  253. sdkconfig_dict = self.get_sdkconfig()
  254. value = None
  255. if (config_key in sdkconfig_dict):
  256. value = sdkconfig_dict[config_key]
  257. return value
  258. @abstractmethod
  259. def _try_get_binary_from_local_fs(self): # type: () -> Optional[str]
  260. pass
  261. def get_binary_path(self): # type: () -> str
  262. path = self._try_get_binary_from_local_fs()
  263. if path:
  264. return path
  265. artifacts = self.artifact_cls(self.idf_path,
  266. self.case_group.get_artifact_index_file(),
  267. self.app_path, self.config_name, self.target)
  268. if isinstance(self, LoadableElfTestApp):
  269. assert self.app_files
  270. path = artifacts.download_artifact_files(self.app_files)
  271. else:
  272. path = artifacts.download_artifacts()
  273. if path:
  274. return os.path.join(self.idf_path, path)
  275. else:
  276. raise OSError('Failed to get binary for {}'.format(self))
  277. def _get_elf_file_path(self): # type: () -> str
  278. ret = ''
  279. file_names = os.listdir(self.binary_path)
  280. for fn in file_names:
  281. if os.path.splitext(fn)[1] == '.elf':
  282. ret = os.path.join(self.binary_path, fn)
  283. return ret
  284. def _int_offs_abs_paths(self, files_list): # type: (List[tuple[str, str]]) -> List[Tuple[int, str]]
  285. return [(int(offs, 0),
  286. os.path.join(self.binary_path, file_path.strip()))
  287. for (offs, file_path) in files_list]
  288. def _parse_flash_download_config(self): # type: () -> Tuple[List[tuple[int, str]], List[tuple[int, str]], Dict]
  289. """
  290. Parse flash download config from build metadata files
  291. Sets self.flash_files, self.flash_settings
  292. (Called from constructor)
  293. Returns (flash_files, encrypt_files, flash_settings)
  294. """
  295. if self.IDF_FLASH_ARGS_FILE in os.listdir(self.binary_path):
  296. # CMake version using build metadata file
  297. path = os.path.join(self.binary_path, self.IDF_FLASH_ARGS_FILE)
  298. else:
  299. # GNU Make version uses download.config arguments file
  300. path = os.path.join(self.binary_path, self.IDF_DOWNLOAD_CONFIG_FILE)
  301. # If the JSON doesn't find the encrypted flag for our files, provide
  302. # a default encrpytion flag: the macro
  303. # CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT
  304. sdkconfig_dict = self.get_sdkconfig()
  305. default_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict
  306. flash_files, encrypt_files, flash_settings, _ = parse_flash_settings(path, default_encryption)
  307. # Flash setting "encrypt" only and only if all the files to flash
  308. # must be encrypted. Else, this parameter should be False.
  309. # All files must be encrypted is both file lists are the same
  310. flash_settings['encrypt'] = sorted(flash_files) == sorted(encrypt_files)
  311. return self._int_offs_abs_paths(flash_files), self._int_offs_abs_paths(encrypt_files), flash_settings
  312. def _parse_partition_table(self): # type: ignore
  313. """
  314. Parse partition table contents based on app binaries
  315. Returns partition_table data
  316. (Called from constructor)
  317. """
  318. partition_tool = os.path.join(self.idf_path,
  319. 'components',
  320. 'partition_table',
  321. 'gen_esp32part.py')
  322. assert os.path.exists(partition_tool)
  323. errors = []
  324. # self.flash_files is sorted based on offset in order to have a consistent result with different versions of
  325. # Python
  326. for (_, path) in sorted(self.flash_files, key=lambda elem: elem[0]):
  327. if 'partition' in os.path.split(path)[1]:
  328. partition_file = os.path.join(self.binary_path, path)
  329. process = subprocess.Popen([sys.executable, partition_tool, partition_file],
  330. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  331. (raw_data, raw_error) = process.communicate()
  332. if isinstance(raw_error, bytes):
  333. raw_error = raw_error.decode()
  334. if 'Traceback' in raw_error:
  335. # Some exception occurred. It is possible that we've tried the wrong binary file.
  336. errors.append((path, raw_error))
  337. continue
  338. if isinstance(raw_data, bytes):
  339. raw_data = raw_data.decode()
  340. break
  341. else:
  342. traceback_msg = os.linesep.join(['{} {}:{}{}'.format(partition_tool,
  343. p,
  344. os.linesep,
  345. msg) for p, msg in errors])
  346. raise ValueError('No partition table found for IDF binary path: {}{}{}'.format(self.binary_path,
  347. os.linesep,
  348. traceback_msg))
  349. partition_table = dict()
  350. for line in raw_data.splitlines():
  351. if line[0] != '#':
  352. try:
  353. _name, _type, _subtype, _offset, _size, _flags = line.split(',')
  354. if _size[-1] == 'K':
  355. _size = int(_size[:-1]) * 1024
  356. elif _size[-1] == 'M':
  357. _size = int(_size[:-1]) * 1024 * 1024
  358. else:
  359. _size = int(_size)
  360. _offset = int(_offset, 0)
  361. except ValueError:
  362. continue
  363. partition_table[_name] = {
  364. 'type': _type,
  365. 'subtype': _subtype,
  366. 'offset': _offset,
  367. 'size': _size,
  368. 'flags': _flags
  369. }
  370. return partition_table
  371. def get_elf_sha256(self): # type: () -> Optional[str]
  372. if self._elf_file_sha256:
  373. return self._elf_file_sha256
  374. sha256 = hashlib.sha256()
  375. with open(self.elf_file, 'rb') as f:
  376. sha256.update(f.read())
  377. self._elf_file_sha256 = sha256.hexdigest()
  378. return self._elf_file_sha256
  379. class Example(IDFApp):
  380. def __init__(self, app_path, config_name='default', target='esp32', case_group=ExampleGroup, artifacts_cls=Artifacts):
  381. # type: (str, str, str, Type[ExampleGroup], Type[Artifacts]) -> None
  382. if not config_name:
  383. config_name = 'default'
  384. if not target:
  385. target = 'esp32'
  386. super(Example, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  387. def _get_sdkconfig_paths(self): # type: () -> List[str]
  388. """
  389. overrides the parent method to provide exact path of sdkconfig for example tests
  390. """
  391. return [os.path.join(self.binary_path, '..', 'sdkconfig')]
  392. def _try_get_binary_from_local_fs(self): # type: () -> Optional[str]
  393. # build folder of example path
  394. path = os.path.join(self.idf_path, self.app_path, 'build')
  395. if os.path.exists(path):
  396. return path
  397. # Search for CI build folders.
  398. # Path format: $IDF_PATH/build_examples/app_path_with_underscores/config/target
  399. # (see tools/ci/build_examples.sh)
  400. # For example: $IDF_PATH/build_examples/examples_get-started_blink/default/esp32
  401. app_path_underscored = self.app_path.replace(os.path.sep, '_')
  402. example_path = os.path.join(self.idf_path, self.case_group.LOCAL_BUILD_DIR)
  403. for dirpath in os.listdir(example_path):
  404. if os.path.basename(dirpath) == app_path_underscored:
  405. path = os.path.join(example_path, dirpath, self.config_name, self.target, 'build')
  406. if os.path.exists(path):
  407. return path
  408. else:
  409. return None
  410. return None
  411. class UT(IDFApp):
  412. def __init__(self, app_path, config_name='default', target='esp32', case_group=UnitTestGroup, artifacts_cls=UnitTestArtifacts):
  413. # type: (str, str, str, Type[UnitTestGroup], Type[UnitTestArtifacts]) -> None
  414. if not config_name:
  415. config_name = 'default'
  416. if not target:
  417. target = 'esp32'
  418. super(UT, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  419. def _try_get_binary_from_local_fs(self): # type: () -> Optional[str]
  420. path = os.path.join(self.idf_path, self.app_path, 'build')
  421. if os.path.exists(path):
  422. return path
  423. # first try to get from build folder of unit-test-app
  424. path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'build')
  425. if os.path.exists(path):
  426. # found, use bin in build path
  427. return path
  428. # ``build_unit_test.sh`` will copy binary to output folder
  429. path = os.path.join(self.idf_path, 'tools', 'unit-test-app', 'output', self.target, self.config_name)
  430. if os.path.exists(path):
  431. return path
  432. return None
  433. class TestApp(Example):
  434. def __init__(self, app_path, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts):
  435. # type: (str, str, str, Type[TestAppsGroup], Type[Artifacts]) -> None
  436. super(TestApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  437. class ComponentUTApp(TestApp):
  438. def __init__(self, app_path, config_name='default', target='esp32', case_group=ComponentUTGroup, artifacts_cls=Artifacts):
  439. # type: (str, str, str, Type[ComponentUTGroup], Type[Artifacts]) -> None
  440. super(ComponentUTApp, self).__init__(app_path, config_name, target, case_group, artifacts_cls)
  441. class LoadableElfTestApp(TestApp):
  442. def __init__(self, app_path, app_files, config_name='default', target='esp32', case_group=TestAppsGroup, artifacts_cls=Artifacts):
  443. # type: (str, List[str], str, str, Type[TestAppsGroup], Type[Artifacts]) -> None
  444. # add arg `app_files` for loadable elf test_app.
  445. # Such examples only build elf files, so it doesn't generate flasher_args.json.
  446. # So we can't get app files from config file. Test case should pass it to application.
  447. super(IDFApp, self).__init__(app_path)
  448. self.app_path = app_path
  449. self.app_files = app_files
  450. self.config_name = config_name or 'default'
  451. self.target = target or 'esp32'
  452. self.idf_path = self.get_sdk_path()
  453. self.case_group = case_group
  454. self.artifact_cls = artifacts_cls
  455. self.binary_path = self.get_binary_path()
  456. self.elf_file = self._get_elf_file_path()
  457. assert os.path.exists(self.binary_path)
  458. class SSC(IDFApp):
  459. def get_binary_path(self): # type: () -> str
  460. # TODO: to implement SSC get binary path
  461. return self.app_path
  462. class AT(IDFApp):
  463. def get_binary_path(self): # type: () -> str
  464. # TODO: to implement AT get binary path
  465. return self.app_path