nsdk_builder.py 22 KB


  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import time
  5. import copy
  6. import glob
  7. import serial
  8. import tempfile
  9. import json
  10. import argparse
  11. from threading import Thread
  12. import subprocess
  13. from nsdk_utils import *
  14. VALID_MAKEFILE_NAMES = ['Makefile', 'makefile', "GNUMakefile"]
  15. class nsdk_builder(object):
  16. def __init__(self):
  17. pass
  18. @staticmethod
  19. def is_app(appdir):
  20. if os.path.isdir(appdir) == False:
  21. return False
  22. appdir = os.path.realpath(appdir)
  23. for mkname in VALID_MAKEFILE_NAMES:
  24. mkfile_path = os.path.join(appdir, mkname)
  25. if os.path.isfile(mkfile_path):
  26. return True
  27. return False
  28. @staticmethod
  29. def get_objects(appdir):
  30. if nsdk_builder.is_app(appdir) == False:
  31. return None
  32. def find_app_object(pattern):
  33. files = find_files(appdir, pattern)
  34. return (files[0] if len(files) > 0 else "")
  35. build_objects = dict()
  36. build_objects["elf"] = find_app_object("*.elf")
  37. build_objects["map"] = find_app_object("*.map")
  38. build_objects["dump"] = find_app_object("*.dump")
  39. build_objects["dasm"] = find_app_object("*.dasm")
  40. build_objects["verilog"] = find_app_object("*.verilog")
  41. return build_objects
  42. def build_target_only(self, appdir, make_options="", target="clean", show_output=True, logfile=None):
  43. if self.is_app(appdir) == False:
  44. return COMMAND_NOTAPP, 0
  45. build_cmd = "make -C %s %s %s" % (appdir, make_options, target)
  46. if not ((show_output == False) and (target == "info")):
  47. print("Build application %s, with target: %s" % (appdir, target))
  48. print("Build command: %s" % (build_cmd))
  49. ret, ticks = run_command(build_cmd, show_output, logfile=logfile)
  50. if not ((show_output == False) and (target == "info")):
  51. print("Build command return value: %s" % (ret))
  52. return ret, ticks
  53. def get_build_info(self, appdir, make_options=""):
  54. infolog = tempfile.mktemp()
  55. ret, _ = self.build_target_only(appdir, make_options, "info", False, infolog)
  56. build_info = None
  57. if ret != COMMAND_RUNOK:
  58. os.remove(infolog)
  59. return build_info
  60. with open(infolog, "r") as inf:
  61. for line in inf.readlines():
  62. line = line.strip()
  63. INFO_TAG = "Current Configuration:"
  64. if line.startswith(INFO_TAG):
  65. build_info = dict()
  66. infos = line.strip(INFO_TAG).split()
  67. for info in infos:
  68. splits = info.split("=")
  69. if len(splits) == 2:
  70. build_info[splits[0]] = splits[1]
  71. os.remove(infolog)
  72. return build_info
  73. def build_target(self, appdir, make_options="", target="clean", show_output=True, logfile=None):
  74. if self.is_app(appdir) == False:
  75. return False, None
  76. build_status = dict()
  77. ret, ticks = self.build_target_only(appdir, make_options, target, show_output, logfile)
  78. cmdsts = True
  79. if ret == COMMAND_INTERRUPTED:
  80. print("%s: Exit program due to CTRL - C pressed" % (sys._getframe().f_code.co_name))
  81. sys.exit(1)
  82. elif ret == COMMAND_RUNOK:
  83. cmdsts = True
  84. else:
  85. cmdsts = False
  86. build_status["app"] = { "path": appdir, \
  87. "make_options": make_options, \
  88. "target": target }
  89. build_status["status"] = {"build": cmdsts}
  90. build_status["status_code"] = {"build": ret}
  91. build_status["logs"] = {"build": logfile}
  92. build_status["time"] = {"build": round(ticks, 2)}
  93. build_status["objects"] = nsdk_builder.get_objects(appdir)
  94. build_status["info"] = self.get_build_info(appdir, make_options)
  95. build_status["size"] = get_elfsize(build_status["objects"].get("elf", ""))
  96. return cmdsts, build_status
  97. def clean_app(self, appdir, make_options="", show_output=True, logfile=None):
  98. return self.build_target(appdir, make_options, "clean", show_output, logfile)
  99. def compile_app(self, appdir, make_options="", show_output=True, logfile=None):
  100. return self.build_target(appdir, make_options, "all", show_output, logfile)
  101. def upload_app(self, appdir, make_options="", show_output=True, logfile=None):
  102. if logfile is None:
  103. uploadlog = tempfile.mktemp()
  104. else:
  105. uploadlog = logfile
  106. cmdsts, build_status = self.build_target(appdir, make_options, "upload", show_output, uploadlog)
  107. if cmdsts:
  108. upload_sts = False
  109. with open(uploadlog, 'r') as uf:
  110. for line in uf.readlines():
  111. if "Start address" in line:
  112. upload_sts = True
  113. break
  114. if upload_sts == False: # actually not upload successfully
  115. cmdsts = False
  116. if logfile is None:
  117. os.remove(uploadlog)
  118. print("Upload application %s status: %s" % (appdir, cmdsts))
  119. return cmdsts, build_status
  120. class MonitorThread(Thread):
  121. def __init__(self, port:str, baudrate:str, timeout:int, checks:dict, checktime=time.time(), sdk_check=False, logfile=None, show_output=False):
  122. super().__init__()
  123. self.port = port
  124. self.baudrate = baudrate
  125. self.timeout = timeout
  126. self.checks = checks
  127. self.checktime = checktime
  128. self.sdk_check = sdk_check
  129. self.logfile = logfile
  130. self.show_output = show_output
  131. self._exit_req = False
  132. pass
  133. def get_result(self):
  134. try:
  135. return self.result
  136. except Exception:
  137. return None
  138. def exit_request(self):
  139. self._exit_req = True
  140. pass
  141. def run(self):
  142. start_time = time.time()
  143. serial_log = ""
  144. check_status = False
  145. pass_checks = self.checks.get("PASS", [])
  146. fail_checks = self.checks.get("FAIL", [])
  147. def test_in_check(string, checks):
  148. if type(checks) == list:
  149. for check in checks:
  150. if check in string:
  151. return True
  152. return False
  153. print("Read serial log from %s, baudrate %s" %(self.port, self.baudrate))
  154. NSDK_CHECK_TAG = "Nuclei SDK Build Time:"
  155. print("Checker used: ", self.checks)
  156. check_finished = False
  157. try:
  158. ser = None
  159. ser = serial.Serial(self.port, self.baudrate, timeout=5)
  160. while (time.time() - start_time) < self.timeout:
  161. if self._exit_req:
  162. break
  163. # Remove '\r' in serial read line
  164. sline = ser.readline()
  165. line = str(try_decode_bytes(sline)).replace('\r', '')
  166. if self.sdk_check == True:
  167. if self.show_output:
  168. print("XXX Check " + line, end='')
  169. if NSDK_CHECK_TAG in line:
  170. timestr = line.split(NSDK_CHECK_TAG)[-1].strip()
  171. cur_time = time.mktime(time.strptime(timestr, "%b %d %Y, %H:%M:%S"))
  172. if int(cur_time) >= int(self.checktime):
  173. self.sdk_check = False
  174. line = NSDK_CHECK_TAG + " " + timestr + "\n"
  175. serial_log = serial_log + str(line)
  176. else:
  177. serial_log = serial_log + str(line)
  178. if self.show_output:
  179. print(line, end='')
  180. if check_finished == False:
  181. if test_in_check(line, fail_checks):
  182. check_status = False
  183. check_finished = True
  184. if test_in_check(line, pass_checks):
  185. check_status = True
  186. check_finished = True
  187. if check_finished:
  188. # record another 2 seconds by reset start_time and timeout to 2
  189. start_time = time.time()
  190. self.timeout = 2
  191. except serial.serialutil.SerialException:
  192. # https://stackoverflow.com/questions/21050671/how-to-check-if-device-is-connected-pyserial
  193. print("serial port %s might not exist or in use" % self.port)
  194. except:
  195. print("Some error happens during serial operations")
  196. finally:
  197. if ser:
  198. ser.close()
  199. if self.logfile:
  200. with open(self.logfile, 'w') as lf:
  201. lf.write(serial_log)
  202. self.result = check_status
  203. return check_status
  204. def monitor_serial_and_check(port:str, baudrate:str, timeout:int, checks:dict, checktime=time.time(), sdk_check=False, logfile=None, show_output=False):
  205. start_time = time.time()
  206. serial_log = ""
  207. check_status = False
  208. pass_checks = checks.get("PASS", [])
  209. fail_checks = checks.get("FAIL", [])
  210. def test_in_check(string, checks):
  211. if type(checks) == list:
  212. for check in checks:
  213. if check in string:
  214. return True
  215. return False
  216. print("Read serial log from %s, baudrate %s" %(port, baudrate))
  217. NSDK_CHECK_TAG = "Nuclei SDK Build Time:"
  218. print("Checker used: ", checks)
  219. check_finished = False
  220. try:
  221. ser = None
  222. ser = serial.Serial(port, baudrate, timeout=5)
  223. while (time.time() - start_time) < timeout:
  224. # Remove '\r' in serial read line
  225. sline = ser.readline()
  226. line = str(try_decode_bytes(sline)).replace('\r', '')
  227. if sdk_check == True:
  228. if show_output:
  229. print("XXX Check " + line, end='')
  230. if NSDK_CHECK_TAG in line:
  231. timestr = line.split(NSDK_CHECK_TAG)[-1].strip()
  232. cur_time = time.mktime(time.strptime(timestr, "%b %d %Y, %H:%M:%S"))
  233. if int(cur_time) >= int(checktime):
  234. sdk_check = False
  235. line = NSDK_CHECK_TAG + " " + timestr + "\n"
  236. serial_log = serial_log + str(line)
  237. else:
  238. serial_log = serial_log + str(line)
  239. if show_output:
  240. print(line, end='')
  241. if check_finished == False:
  242. if test_in_check(line, fail_checks):
  243. check_status = False
  244. check_finished = True
  245. if test_in_check(line, pass_checks):
  246. check_status = True
  247. check_finished = True
  248. if check_finished:
  249. # record another 2 seconds by reset start_time and timeout to 2
  250. start_time = time.time()
  251. timeout = 2
  252. except serial.serialutil.SerialException:
  253. # https://stackoverflow.com/questions/21050671/how-to-check-if-device-is-connected-pyserial
  254. print("serial port %s might not exist or in use" % port)
  255. except:
  256. print("Some error happens during serial operations")
  257. finally:
  258. if ser:
  259. ser.close()
  260. if logfile:
  261. with open(logfile, 'w') as lf:
  262. lf.write(serial_log)
  263. return check_status
  264. class nsdk_runner(nsdk_builder):
  265. def __init__(self):
  266. super().__init__()
  267. pass
  268. @staticmethod
  269. def find_apps(rootdir):
  270. subdirectories = [x[0] for x in os.walk(rootdir)]
  271. appdirs = []
  272. for subdir in subdirectories:
  273. if nsdk_runner.is_app(subdir):
  274. appdirs.append(os.path.normpath(subdir))
  275. return appdirs
  276. def build_target_in_directory(self, rootdir, make_options="", target="", \
  277. show_output=True, logdir=None, stoponfail=False):
  278. appdirs = self.find_apps(rootdir)
  279. if len(appdirs) == 0:
  280. return False, None
  281. cmdsts = True
  282. build_status = dict()
  283. createlog = False
  284. if isinstance(logdir, str):
  285. createlog = True
  286. if os.path.isdir(logdir) == False:
  287. os.makedirs(logdir)
  288. for appdir in appdirs:
  289. appdir = appdir.replace("\\", "/") # Change windows \\ path to /
  290. applogfile = None
  291. if createlog:
  292. applogfile = get_logfile(appdir, rootdir, logdir, "build.log")
  293. appcmdsts, appsts = self.build_target(appdir, make_options, \
  294. target, show_output, logfile=applogfile)
  295. build_status[appdir] = appsts
  296. if appcmdsts == False:
  297. cmdsts = appcmdsts
  298. if stoponfail == True:
  299. print("Stop build directory due to fail on application %s" %(appdir))
  300. return cmdsts, build_status
  301. return cmdsts, build_status
  302. def analyze_runlog(self, logfile):
  303. result = {"type": "unknown", "value": {}}
  304. result_lines = open(logfile).readlines()
  305. program_found, result_parsed = parse_benchmark_runlog(result_lines)
  306. if program_found != PROGRAM_UNKNOWN:
  307. result = {"type": program_found, "value": result_parsed}
  308. return result
  309. def run_app_onhw(self, appdir, runcfg:dict(), show_output=True, logfile=None):
  310. app_runcfg = runcfg.get("run_config", dict())
  311. app_runchecks = runcfg.get("checks", dict())
  312. make_options = runcfg["misc"]["make_options"]
  313. checktime = runcfg["misc"]["build_time"]
  314. hwconfig = app_runcfg.get("hardware", None)
  315. serport = None
  316. timeout = 60
  317. baudrate = 115200
  318. if hwconfig and "serport" in hwconfig:
  319. serport = hwconfig.get("serport", "/dev/ttyUSB1")
  320. baudrate = hwconfig.get("baudrate", 115200)
  321. timeout = hwconfig.get("timeout", 60)
  322. ser_thread = None
  323. try:
  324. if serport:
  325. #ser_thread = NThread(monitor_serial_and_check, \
  326. # (serport, baudrate, timeout, app_runchecks, checktime, True, logfile, show_output))
  327. ser_thread = MonitorThread(serport, baudrate, timeout, app_runchecks, checktime, True, logfile, show_output)
  328. ser_thread.start()
  329. cmdsts, _ = self.upload_app(appdir, make_options, show_output, None)
  330. status = True
  331. if ser_thread:
  332. if cmdsts == False:
  333. ser_thread.exit_request()
  334. while ser_thread.is_alive():
  335. ser_thread.join(1)
  336. status = ser_thread.get_result()
  337. del ser_thread
  338. except (KeyboardInterrupt, SystemExit):
  339. print("%s: Exit program due to CTRL - C pressed or SystemExit" % (sys._getframe().f_code.co_name))
  340. if ser_thread:
  341. ser_thread.exit_request()
  342. sys.exit(1)
  343. final_status = cmdsts and status
  344. return final_status
  345. def build_app_with_config(self, appdir, appconfig:dict, show_output=True, logfile=None):
  346. build_config = appconfig.get("build_config", None)
  347. target = appconfig.get("build_target", "all")
  348. make_options = ""
  349. if isinstance(build_config, dict):
  350. for key, value in build_config.items():
  351. value = str(value).strip()
  352. if " " in key:
  353. continue
  354. if " " in value:
  355. make_options += " %s=\"%s\""%(key, value)
  356. else:
  357. make_options += " %s=%s"%(key, value)
  358. appcmdsts, appsts = self.build_target(appdir, make_options, target, show_output, logfile)
  359. buildtime = appsts["time"]["build"]
  360. print("Build application %s, time cost %s seconds, passed: %s" %(appdir, buildtime, appcmdsts))
  361. appsts["config"] = appconfig
  362. return appcmdsts, appsts
  363. def run_app_with_config(self, appdir, appconfig:dict, show_output=True, buildlog=None, runlog=None):
  364. appconfig["build_target"] = "clean dasm"
  365. # build application
  366. build_cktime = time.time()
  367. appcmdsts, appsts = self.build_app_with_config(appdir, appconfig, show_output, buildlog)
  368. # run application
  369. if appcmdsts == False:
  370. print("Failed to build application %s, so we can't run it!" % (appdir))
  371. return appcmdsts, appsts
  372. # get run config
  373. app_runcfg = appconfig.get("run_config", dict())
  374. app_runtarget = app_runcfg.get("target", "hardware")
  375. # get run checks
  376. DEFAULT_CHECKS = { "PASS": [ ], "FAIL": [ "MCAUSE:" ] }
  377. app_runchecks = appconfig.get("checks", DEFAULT_CHECKS)
  378. misc_config = {"make_options": appsts["app"]["make_options"], "build_time": build_cktime}
  379. runcfg = {"run_config": app_runcfg, "checks": app_runchecks, "misc": misc_config}
  380. print("Run application on %s" % app_runtarget)
  381. runstarttime = time.time()
  382. runstatus = False
  383. appsts["status_code"]["run"] = RUNSTATUS_NOTSTART
  384. if app_runtarget == "hardware":
  385. runstatus = self.run_app_onhw(appdir, runcfg, show_output, runlog)
  386. # If run successfully, then do log analyze
  387. if runlog and runstatus:
  388. appsts["result"] = self.analyze_runlog(runlog)
  389. appsts["logs"]["run"] = runlog
  390. appsts["status_code"]["run"] = RUNSTATUS_OK if runstatus else RUNSTATUS_FAIL
  391. runtime = round(time.time() - runstarttime, 2)
  392. print("Run application %s on %s, time cost %s seconds, passed: %s" %(appdir, app_runtarget, runtime, runstatus))
  393. appsts["status"]["run"] = runstatus
  394. appsts["time"]["run"] = runtime
  395. return runstatus, appsts
  396. def build_apps_with_config(self, config:dict, show_output=True, logdir=None, stoponfail=False):
  397. # Build all the applications, each application only has one configuration
  398. # "app" : { the_only_config }
  399. cmdsts = True
  400. build_status = dict()
  401. apps_config = copy.deepcopy(config)
  402. for appdir in apps_config:
  403. appconfig = apps_config[appdir]
  404. applogs = appconfig.get("logs", dict())
  405. applogfile = applogs.get("build", None)
  406. appcmdsts, appsts = self.build_app_with_config(appdir, appconfig, show_output, applogfile)
  407. build_status[appdir] = appsts
  408. if appcmdsts == False:
  409. cmdsts = appcmdsts
  410. if stoponfail == True:
  411. print("Stop build apps with config due to fail on application %s" %(appdir))
  412. return cmdsts, build_status
  413. return cmdsts, build_status
  414. def build_apps_with_configs(self, config:dict, show_output=True, logdir=None, stoponfail=False):
  415. # Build all the applications, each application has more than one configuration
  416. # "app" : {"configs": {"case1": case1_config}}
  417. cmdsts = True
  418. build_status = dict()
  419. apps_config = copy.deepcopy(config)
  420. for appdir in apps_config:
  421. appconfigs = apps_config[appdir]
  422. if "configs" not in appconfigs:
  423. continue
  424. build_status[appdir] = dict()
  425. app_allconfigs = appconfigs["configs"]
  426. for cfgname in app_allconfigs:
  427. appconfig = app_allconfigs[cfgname] # get configuration for each case for single app
  428. applogs = appconfig.get("logs", dict())
  429. applogfile = applogs.get("build", None)
  430. appcmdsts, appsts = self.build_app_with_config(appdir, appconfig, show_output, applogfile)
  431. build_status[appdir][cfgname] = appsts
  432. if appcmdsts == False:
  433. cmdsts = appcmdsts
  434. if stoponfail == True:
  435. print("Stop build apps with config due to fail on application %s" %(appdir))
  436. return cmdsts, build_status
  437. return cmdsts, build_status
  438. def run_apps_with_config(self, config:dict, show_output=True, stoponfail=False):
  439. # Run all the applications, each application only has one configuration
  440. # "app" : { the_only_config }
  441. cmdsts = True
  442. build_status = dict()
  443. apps_config = copy.deepcopy(config)
  444. for appdir in apps_config:
  445. appconfig = apps_config[appdir]
  446. applogs = appconfig.get("logs", dict())
  447. app_buildlogfile = applogs.get("build", None)
  448. app_runlogfile = applogs.get("run", None)
  449. appcmdsts, appsts = self.run_app_with_config(appdir, appconfig, show_output, app_buildlogfile, app_runlogfile)
  450. build_status[appdir] = appsts
  451. if appcmdsts == False:
  452. cmdsts = appcmdsts
  453. if stoponfail == True:
  454. print("Stop run apps with config due to fail on application %s" %(appdir))
  455. return cmdsts, build_status
  456. return cmdsts, build_status
  457. def run_apps_with_configs(self, config:dict, show_output=True, stoponfail=False):
  458. # Run all the applications, each application has more than one configuration
  459. # "app" : {"configs": {"case1": case1_config}}
  460. cmdsts = True
  461. build_status = dict()
  462. apps_config = copy.deepcopy(config)
  463. for appdir in apps_config:
  464. appconfigs = apps_config[appdir]
  465. if "configs" not in appconfigs:
  466. continue
  467. build_status[appdir] = dict()
  468. app_allconfigs = appconfigs["configs"]
  469. for cfgname in app_allconfigs:
  470. appconfig = app_allconfigs[cfgname] # get configuration for each case for single app
  471. applogs = appconfig.get("logs", dict())
  472. app_buildlogfile = applogs.get("build", None)
  473. app_runlogfile = applogs.get("run", None)
  474. appcmdsts, appsts = self.run_app_with_config(appdir, appconfig, show_output, app_buildlogfile, app_runlogfile)
  475. build_status[appdir][cfgname] = appsts
  476. if appcmdsts == False:
  477. cmdsts = appcmdsts
  478. if stoponfail == True:
  479. print("Stop run apps with config due to fail on application %s" %(appdir))
  480. return cmdsts, build_status
  481. return cmdsts, build_status