| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 |
- #!/usr/bin/env python3
- import os
- import sys
- import time
- import copy
- import glob
- import serial
- import tempfile
- import json
- import argparse
- from threading import Thread
- import subprocess
- from nsdk_utils import *
- VALID_MAKEFILE_NAMES = ['Makefile', 'makefile', "GNUMakefile"]
- class nsdk_builder(object):
- def __init__(self):
- pass
- @staticmethod
- def is_app(appdir):
- if os.path.isdir(appdir) == False:
- return False
- appdir = os.path.realpath(appdir)
- for mkname in VALID_MAKEFILE_NAMES:
- mkfile_path = os.path.join(appdir, mkname)
- if os.path.isfile(mkfile_path):
- return True
- return False
- @staticmethod
- def get_objects(appdir):
- if nsdk_builder.is_app(appdir) == False:
- return None
- def find_app_object(pattern):
- files = find_files(appdir, pattern)
- return (files[0] if len(files) > 0 else "")
- build_objects = dict()
- build_objects["elf"] = find_app_object("*.elf")
- build_objects["map"] = find_app_object("*.map")
- build_objects["dump"] = find_app_object("*.dump")
- build_objects["dasm"] = find_app_object("*.dasm")
- build_objects["verilog"] = find_app_object("*.verilog")
- return build_objects
- def build_target_only(self, appdir, make_options="", target="clean", show_output=True, logfile=None):
- if self.is_app(appdir) == False:
- return COMMAND_NOTAPP, 0
- build_cmd = "make -C %s %s %s" % (appdir, make_options, target)
- if not ((show_output == False) and (target == "info")):
- print("Build application %s, with target: %s" % (appdir, target))
- print("Build command: %s" % (build_cmd))
- ret, ticks = run_command(build_cmd, show_output, logfile=logfile)
- if not ((show_output == False) and (target == "info")):
- print("Build command return value: %s" % (ret))
- return ret, ticks
- def get_build_info(self, appdir, make_options=""):
- infolog = tempfile.mktemp()
- ret, _ = self.build_target_only(appdir, make_options, "info", False, infolog)
- build_info = None
- if ret != COMMAND_RUNOK:
- os.remove(infolog)
- return build_info
- with open(infolog, "r") as inf:
- for line in inf.readlines():
- line = line.strip()
- INFO_TAG = "Current Configuration:"
- if line.startswith(INFO_TAG):
- build_info = dict()
- infos = line.strip(INFO_TAG).split()
- for info in infos:
- splits = info.split("=")
- if len(splits) == 2:
- build_info[splits[0]] = splits[1]
- os.remove(infolog)
- return build_info
- def build_target(self, appdir, make_options="", target="clean", show_output=True, logfile=None):
- if self.is_app(appdir) == False:
- return False, None
- build_status = dict()
- ret, ticks = self.build_target_only(appdir, make_options, target, show_output, logfile)
- cmdsts = True
- if ret == COMMAND_INTERRUPTED:
- print("%s: Exit program due to CTRL - C pressed" % (sys._getframe().f_code.co_name))
- sys.exit(1)
- elif ret == COMMAND_RUNOK:
- cmdsts = True
- else:
- cmdsts = False
- build_status["app"] = { "path": appdir, \
- "make_options": make_options, \
- "target": target }
- build_status["status"] = {"build": cmdsts}
- build_status["status_code"] = {"build": ret}
- build_status["logs"] = {"build": logfile}
- build_status["time"] = {"build": round(ticks, 2)}
- build_status["objects"] = nsdk_builder.get_objects(appdir)
- build_status["info"] = self.get_build_info(appdir, make_options)
- build_status["size"] = get_elfsize(build_status["objects"].get("elf", ""))
- return cmdsts, build_status
- def clean_app(self, appdir, make_options="", show_output=True, logfile=None):
- return self.build_target(appdir, make_options, "clean", show_output, logfile)
- def compile_app(self, appdir, make_options="", show_output=True, logfile=None):
- return self.build_target(appdir, make_options, "all", show_output, logfile)
- def upload_app(self, appdir, make_options="", show_output=True, logfile=None):
- if logfile is None:
- uploadlog = tempfile.mktemp()
- else:
- uploadlog = logfile
- cmdsts, build_status = self.build_target(appdir, make_options, "upload", show_output, uploadlog)
- if cmdsts:
- upload_sts = False
- with open(uploadlog, 'r') as uf:
- for line in uf.readlines():
- if "Start address" in line:
- upload_sts = True
- break
- if upload_sts == False: # actually not upload successfully
- cmdsts = False
- if logfile is None:
- os.remove(uploadlog)
- print("Upload application %s status: %s" % (appdir, cmdsts))
- return cmdsts, build_status
- class MonitorThread(Thread):
- def __init__(self, port:str, baudrate:str, timeout:int, checks:dict, checktime=time.time(), sdk_check=False, logfile=None, show_output=False):
- super().__init__()
- self.port = port
- self.baudrate = baudrate
- self.timeout = timeout
- self.checks = checks
- self.checktime = checktime
- self.sdk_check = sdk_check
- self.logfile = logfile
- self.show_output = show_output
- self._exit_req = False
- pass
- def get_result(self):
- try:
- return self.result
- except Exception:
- return None
- def exit_request(self):
- self._exit_req = True
- pass
- def run(self):
- start_time = time.time()
- serial_log = ""
- check_status = False
- pass_checks = self.checks.get("PASS", [])
- fail_checks = self.checks.get("FAIL", [])
- def test_in_check(string, checks):
- if type(checks) == list:
- for check in checks:
- if check in string:
- return True
- return False
- print("Read serial log from %s, baudrate %s" %(self.port, self.baudrate))
- NSDK_CHECK_TAG = "Nuclei SDK Build Time:"
- print("Checker used: ", self.checks)
- check_finished = False
- try:
- ser = None
- ser = serial.Serial(self.port, self.baudrate, timeout=5)
- while (time.time() - start_time) < self.timeout:
- if self._exit_req:
- break
- # Remove '\r' in serial read line
- sline = ser.readline()
- line = str(try_decode_bytes(sline)).replace('\r', '')
- if self.sdk_check == True:
- if self.show_output:
- print("XXX Check " + line, end='')
- if NSDK_CHECK_TAG in line:
- timestr = line.split(NSDK_CHECK_TAG)[-1].strip()
- cur_time = time.mktime(time.strptime(timestr, "%b %d %Y, %H:%M:%S"))
- if int(cur_time) >= int(self.checktime):
- self.sdk_check = False
- line = NSDK_CHECK_TAG + " " + timestr + "\n"
- serial_log = serial_log + str(line)
- else:
- serial_log = serial_log + str(line)
- if self.show_output:
- print(line, end='')
- if check_finished == False:
- if test_in_check(line, fail_checks):
- check_status = False
- check_finished = True
- if test_in_check(line, pass_checks):
- check_status = True
- check_finished = True
- if check_finished:
- # record another 2 seconds by reset start_time and timeout to 2
- start_time = time.time()
- self.timeout = 2
- except serial.serialutil.SerialException:
- # https://stackoverflow.com/questions/21050671/how-to-check-if-device-is-connected-pyserial
- print("serial port %s might not exist or in use" % self.port)
- except:
- print("Some error happens during serial operations")
- finally:
- if ser:
- ser.close()
- if self.logfile:
- with open(self.logfile, 'w') as lf:
- lf.write(serial_log)
- self.result = check_status
- return check_status
- def monitor_serial_and_check(port:str, baudrate:str, timeout:int, checks:dict, checktime=time.time(), sdk_check=False, logfile=None, show_output=False):
- start_time = time.time()
- serial_log = ""
- check_status = False
- pass_checks = checks.get("PASS", [])
- fail_checks = checks.get("FAIL", [])
- def test_in_check(string, checks):
- if type(checks) == list:
- for check in checks:
- if check in string:
- return True
- return False
- print("Read serial log from %s, baudrate %s" %(port, baudrate))
- NSDK_CHECK_TAG = "Nuclei SDK Build Time:"
- print("Checker used: ", checks)
- check_finished = False
- try:
- ser = None
- ser = serial.Serial(port, baudrate, timeout=5)
- while (time.time() - start_time) < timeout:
- # Remove '\r' in serial read line
- sline = ser.readline()
- line = str(try_decode_bytes(sline)).replace('\r', '')
- if sdk_check == True:
- if show_output:
- print("XXX Check " + line, end='')
- if NSDK_CHECK_TAG in line:
- timestr = line.split(NSDK_CHECK_TAG)[-1].strip()
- cur_time = time.mktime(time.strptime(timestr, "%b %d %Y, %H:%M:%S"))
- if int(cur_time) >= int(checktime):
- sdk_check = False
- line = NSDK_CHECK_TAG + " " + timestr + "\n"
- serial_log = serial_log + str(line)
- else:
- serial_log = serial_log + str(line)
- if show_output:
- print(line, end='')
- if check_finished == False:
- if test_in_check(line, fail_checks):
- check_status = False
- check_finished = True
- if test_in_check(line, pass_checks):
- check_status = True
- check_finished = True
- if check_finished:
- # record another 2 seconds by reset start_time and timeout to 2
- start_time = time.time()
- timeout = 2
- except serial.serialutil.SerialException:
- # https://stackoverflow.com/questions/21050671/how-to-check-if-device-is-connected-pyserial
- print("serial port %s might not exist or in use" % port)
- except:
- print("Some error happens during serial operations")
- finally:
- if ser:
- ser.close()
- if logfile:
- with open(logfile, 'w') as lf:
- lf.write(serial_log)
- return check_status
- class nsdk_runner(nsdk_builder):
- def __init__(self):
- super().__init__()
- pass
- @staticmethod
- def find_apps(rootdir):
- subdirectories = [x[0] for x in os.walk(rootdir)]
- appdirs = []
- for subdir in subdirectories:
- if nsdk_runner.is_app(subdir):
- appdirs.append(os.path.normpath(subdir))
- return appdirs
- def build_target_in_directory(self, rootdir, make_options="", target="", \
- show_output=True, logdir=None, stoponfail=False):
- appdirs = self.find_apps(rootdir)
- if len(appdirs) == 0:
- return False, None
- cmdsts = True
- build_status = dict()
- createlog = False
- if isinstance(logdir, str):
- createlog = True
- if os.path.isdir(logdir) == False:
- os.makedirs(logdir)
- for appdir in appdirs:
- appdir = appdir.replace("\\", "/") # Change windows \\ path to /
- applogfile = None
- if createlog:
- applogfile = get_logfile(appdir, rootdir, logdir, "build.log")
- appcmdsts, appsts = self.build_target(appdir, make_options, \
- target, show_output, logfile=applogfile)
- build_status[appdir] = appsts
- if appcmdsts == False:
- cmdsts = appcmdsts
- if stoponfail == True:
- print("Stop build directory due to fail on application %s" %(appdir))
- return cmdsts, build_status
- return cmdsts, build_status
- def analyze_runlog(self, logfile):
- result = {"type": "unknown", "value": {}}
- result_lines = open(logfile).readlines()
- program_found, result_parsed = parse_benchmark_runlog(result_lines)
- if program_found != PROGRAM_UNKNOWN:
- result = {"type": program_found, "value": result_parsed}
- return result
- def run_app_onhw(self, appdir, runcfg:dict(), show_output=True, logfile=None):
- app_runcfg = runcfg.get("run_config", dict())
- app_runchecks = runcfg.get("checks", dict())
- make_options = runcfg["misc"]["make_options"]
- checktime = runcfg["misc"]["build_time"]
- hwconfig = app_runcfg.get("hardware", None)
- serport = None
- timeout = 60
- baudrate = 115200
- if hwconfig and "serport" in hwconfig:
- serport = hwconfig.get("serport", "/dev/ttyUSB1")
- baudrate = hwconfig.get("baudrate", 115200)
- timeout = hwconfig.get("timeout", 60)
- ser_thread = None
- try:
- if serport:
- #ser_thread = NThread(monitor_serial_and_check, \
- # (serport, baudrate, timeout, app_runchecks, checktime, True, logfile, show_output))
- ser_thread = MonitorThread(serport, baudrate, timeout, app_runchecks, checktime, True, logfile, show_output)
- ser_thread.start()
- cmdsts, _ = self.upload_app(appdir, make_options, show_output, None)
- status = True
- if ser_thread:
- if cmdsts == False:
- ser_thread.exit_request()
- while ser_thread.is_alive():
- ser_thread.join(1)
- status = ser_thread.get_result()
- del ser_thread
- except (KeyboardInterrupt, SystemExit):
- print("%s: Exit program due to CTRL - C pressed or SystemExit" % (sys._getframe().f_code.co_name))
- if ser_thread:
- ser_thread.exit_request()
- sys.exit(1)
- final_status = cmdsts and status
- return final_status
- def build_app_with_config(self, appdir, appconfig:dict, show_output=True, logfile=None):
- build_config = appconfig.get("build_config", None)
- target = appconfig.get("build_target", "all")
- make_options = ""
- if isinstance(build_config, dict):
- for key, value in build_config.items():
- value = str(value).strip()
- if " " in key:
- continue
- if " " in value:
- make_options += " %s=\"%s\""%(key, value)
- else:
- make_options += " %s=%s"%(key, value)
- appcmdsts, appsts = self.build_target(appdir, make_options, target, show_output, logfile)
- buildtime = appsts["time"]["build"]
- print("Build application %s, time cost %s seconds, passed: %s" %(appdir, buildtime, appcmdsts))
- appsts["config"] = appconfig
- return appcmdsts, appsts
- def run_app_with_config(self, appdir, appconfig:dict, show_output=True, buildlog=None, runlog=None):
- appconfig["build_target"] = "clean dasm"
- # build application
- build_cktime = time.time()
- appcmdsts, appsts = self.build_app_with_config(appdir, appconfig, show_output, buildlog)
- # run application
- if appcmdsts == False:
- print("Failed to build application %s, so we can't run it!" % (appdir))
- return appcmdsts, appsts
- # get run config
- app_runcfg = appconfig.get("run_config", dict())
- app_runtarget = app_runcfg.get("target", "hardware")
- # get run checks
- DEFAULT_CHECKS = { "PASS": [ ], "FAIL": [ "MCAUSE:" ] }
- app_runchecks = appconfig.get("checks", DEFAULT_CHECKS)
- misc_config = {"make_options": appsts["app"]["make_options"], "build_time": build_cktime}
- runcfg = {"run_config": app_runcfg, "checks": app_runchecks, "misc": misc_config}
- print("Run application on %s" % app_runtarget)
- runstarttime = time.time()
- runstatus = False
- appsts["status_code"]["run"] = RUNSTATUS_NOTSTART
- if app_runtarget == "hardware":
- runstatus = self.run_app_onhw(appdir, runcfg, show_output, runlog)
- # If run successfully, then do log analyze
- if runlog and runstatus:
- appsts["result"] = self.analyze_runlog(runlog)
- appsts["logs"]["run"] = runlog
- appsts["status_code"]["run"] = RUNSTATUS_OK if runstatus else RUNSTATUS_FAIL
- runtime = round(time.time() - runstarttime, 2)
- print("Run application %s on %s, time cost %s seconds, passed: %s" %(appdir, app_runtarget, runtime, runstatus))
- appsts["status"]["run"] = runstatus
- appsts["time"]["run"] = runtime
- return runstatus, appsts
- def build_apps_with_config(self, config:dict, show_output=True, logdir=None, stoponfail=False):
- # Build all the applications, each application only has one configuration
- # "app" : { the_only_config }
- cmdsts = True
- build_status = dict()
- apps_config = copy.deepcopy(config)
- for appdir in apps_config:
- appconfig = apps_config[appdir]
- applogs = appconfig.get("logs", dict())
- applogfile = applogs.get("build", None)
- appcmdsts, appsts = self.build_app_with_config(appdir, appconfig, show_output, applogfile)
- build_status[appdir] = appsts
- if appcmdsts == False:
- cmdsts = appcmdsts
- if stoponfail == True:
- print("Stop build apps with config due to fail on application %s" %(appdir))
- return cmdsts, build_status
- return cmdsts, build_status
- def build_apps_with_configs(self, config:dict, show_output=True, logdir=None, stoponfail=False):
- # Build all the applications, each application has more than one configuration
- # "app" : {"configs": {"case1": case1_config}}
- cmdsts = True
- build_status = dict()
- apps_config = copy.deepcopy(config)
- for appdir in apps_config:
- appconfigs = apps_config[appdir]
- if "configs" not in appconfigs:
- continue
- build_status[appdir] = dict()
- app_allconfigs = appconfigs["configs"]
- for cfgname in app_allconfigs:
- appconfig = app_allconfigs[cfgname] # get configuration for each case for single app
- applogs = appconfig.get("logs", dict())
- applogfile = applogs.get("build", None)
- appcmdsts, appsts = self.build_app_with_config(appdir, appconfig, show_output, applogfile)
- build_status[appdir][cfgname] = appsts
- if appcmdsts == False:
- cmdsts = appcmdsts
- if stoponfail == True:
- print("Stop build apps with config due to fail on application %s" %(appdir))
- return cmdsts, build_status
- return cmdsts, build_status
- def run_apps_with_config(self, config:dict, show_output=True, stoponfail=False):
- # Run all the applications, each application only has one configuration
- # "app" : { the_only_config }
- cmdsts = True
- build_status = dict()
- apps_config = copy.deepcopy(config)
- for appdir in apps_config:
- appconfig = apps_config[appdir]
- applogs = appconfig.get("logs", dict())
- app_buildlogfile = applogs.get("build", None)
- app_runlogfile = applogs.get("run", None)
- appcmdsts, appsts = self.run_app_with_config(appdir, appconfig, show_output, app_buildlogfile, app_runlogfile)
- build_status[appdir] = appsts
- if appcmdsts == False:
- cmdsts = appcmdsts
- if stoponfail == True:
- print("Stop run apps with config due to fail on application %s" %(appdir))
- return cmdsts, build_status
- return cmdsts, build_status
- def run_apps_with_configs(self, config:dict, show_output=True, stoponfail=False):
- # Run all the applications, each application has more than one configuration
- # "app" : {"configs": {"case1": case1_config}}
- cmdsts = True
- build_status = dict()
- apps_config = copy.deepcopy(config)
- for appdir in apps_config:
- appconfigs = apps_config[appdir]
- if "configs" not in appconfigs:
- continue
- build_status[appdir] = dict()
- app_allconfigs = appconfigs["configs"]
- for cfgname in app_allconfigs:
- appconfig = app_allconfigs[cfgname] # get configuration for each case for single app
- applogs = appconfig.get("logs", dict())
- app_buildlogfile = applogs.get("build", None)
- app_runlogfile = applogs.get("run", None)
- appcmdsts, appsts = self.run_app_with_config(appdir, appconfig, show_output, app_buildlogfile, app_runlogfile)
- build_status[appdir][cfgname] = appsts
- if appcmdsts == False:
- cmdsts = appcmdsts
- if stoponfail == True:
- print("Stop run apps with config due to fail on application %s" %(appdir))
- return cmdsts, build_status
- return cmdsts, build_status
|