| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495 |
- #!/usr/bin/env python3
- import os
- import sys
- SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
- requirement_file = os.path.abspath(os.path.join(SCRIPT_DIR, "..", "requirements.txt"))
- try:
- import time
- import datetime
- import random
- import shutil
- import signal
- import psutil
- import re
- import copy
- import serial
- import serial.tools.list_ports
- import tempfile
- import collections
- from collections import OrderedDict
- from threading import Thread
- import subprocess
- import asyncio
- import glob
- import json
- import yaml
- import importlib.util
- if sys.platform != "win32":
- import fcntl
- import stat
- except Exception as exc:
- print("Import Error: %s" % (exc))
- print("Please install requried packages using: pip3 install -r %s" % (requirement_file))
- sys.exit(1)
- try:
- from collections.abc import Mapping
- except ImportError: # Python 2.7 compatibility
- from collections import Mapping
- SDK_GLOBAL_VARIABLES = {
- "sdk_checktag": "Nuclei SDK Build Time:",
- "sdk_check": True,
- "sdk_banner_tmout": 15,
- "sdk_copy_objects": "elf,map",
- "sdk_copy_objects_flag": False,
- "sdk_ttyerr_maxcnt": 3,
- "sdk_fpgaprog_maxcnt": 3,
- "sdk_gdberr_maxcnt": 10,
- "sdk_uploaderr_maxcnt": 10,
- "sdk_bannertmout_maxcnt": 100,
- "sdk_verb_buildmsg": True,
- "sdk_copy_failobj": True
- }
- INVAILD_SERNO = "xxxxx"
- BANNER_TMOUT = "banner_timeout"
- TTY_OP_ERR = "tty_operate_error"
- TTY_UNKNOWN_ERR = "tty_unknown_error"
- FILE_LOCK_NAME = "fpga_program.lock"
- DATE_FORMATE = "%Y-%m-%d %H:%M:%S"
- def get_tmpdir():
- tempdir = tempfile.gettempdir()
- if sys.platform == "win32":
- wintempdir = "C:\\Users\\Public\\Temp"
- if os.path.isdir(wintempdir) == False:
- os.makedirs(wintempdir)
- tempdir = wintempdir
- return tempdir
- # get ci url information
- def get_ci_info():
- cijoburl = os.environ.get("CI_JOB_URL")
- cipipelineurl = os.environ.get("CI_PIPELINE_URL")
- if cijoburl and cipipelineurl:
- return {"joburl": cijoburl, "pipelineurl": cipipelineurl}
- else:
- return {}
- def get_global_variables():
- return SDK_GLOBAL_VARIABLES
- def get_sdk_checktag():
- checktag = os.environ.get("SDK_CHECKTAG")
- if checktag is None:
- checktag = SDK_GLOBAL_VARIABLES.get("sdk_checktag")
- return checktag
- def get_sdk_copyobjects():
- cpobjs = os.environ.get("SDK_COPY_OBJECTS")
- if cpobjs is None:
- cpobjs = SDK_GLOBAL_VARIABLES.get("sdk_copy_objects")
- return cpobjs
- def get_env_flag(envar, deft=None):
- flag = os.environ.get(envar)
- if flag is None:
- return deft
- return flag.lower() in ('true', '1', 't')
- def get_sdk_check():
- check = get_env_flag("SDK_CHECK")
- if check is None:
- check = SDK_GLOBAL_VARIABLES.get("sdk_check")
- return check
- def get_sdk_verb_buildmsg():
- check = get_env_flag("SDK_VERB_BUILDMSG")
- if check is None:
- check = SDK_GLOBAL_VARIABLES.get("sdk_verb_buildmsg")
- return check
- def get_sdk_copyobjects_flag():
- cpflag = get_env_flag("SDK_COPY_OBJECTS_FLAG")
- if cpflag is None:
- cpflag = SDK_GLOBAL_VARIABLES.get("sdk_copy_objects_flag")
- return cpflag
- def get_sdk_need_copyobjects(appconfig):
- try:
- needed = appconfig.get("copy_objects")
- except:
- needed = False
- if needed != True:
- # use global flag
- needed = get_sdk_copyobjects_flag()
- return needed
- def get_sdk_copy_failobj():
- cpflag = get_env_flag("SDK_COPY_FAILOBJ")
- if cpflag is None:
- cpflag = SDK_GLOBAL_VARIABLES.get("sdk_copy_failobj")
- return cpflag
- def get_sdk_banner_tmout():
- tmout = os.environ.get("SDK_BANNER_TMOUT")
- if tmout is not None:
- tmout = int(tmout)
- else:
- tmout = SDK_GLOBAL_VARIABLES.get("sdk_banner_tmout")
- return tmout
- # some case may run more than default timeout in app.json
- def get_sdk_run_tmout():
- tmout = os.environ.get("SDK_RUN_TMOUT")
- if tmout is not None:
- tmout = int(tmout)
- return tmout
- def get_sdk_fpga_prog_tmout():
- tmout = os.environ.get("FPGA_PROG_TMOUT")
- return tmout
- def get_sdk_ttyerr_maxcnt():
- num = os.environ.get("SDK_TTYERR_MAXCNT")
- if num is not None:
- num = int(num)
- else:
- num = SDK_GLOBAL_VARIABLES.get("sdk_ttyerr_maxcnt")
- return num
- def get_sdk_fpgaprog_maxcnt():
- num = os.environ.get("SDK_FPGAPROG_MAXCNT")
- if num is not None:
- num = int(num)
- else:
- num = SDK_GLOBAL_VARIABLES.get("sdk_fpgaprog_maxcnt")
- return num
- def get_sdk_gdberr_maxcnt():
- num = os.environ.get("SDK_GDBERR_MAXCNT")
- if num is not None:
- num = int(num)
- else:
- num = SDK_GLOBAL_VARIABLES.get("sdk_gdberr_maxcnt")
- return num
- def get_sdk_bannertmout_maxcnt():
- num = os.environ.get("SDK_BANNERTMOUT_MAXCNT")
- if num is not None:
- num = int(num)
- else:
- num = SDK_GLOBAL_VARIABLES.get("sdk_bannertmout_maxcnt")
- return num
- def get_sdk_uploaderr_maxcnt():
- num = os.environ.get("SDK_UPLOADERR_MAXCNT")
- if num is not None:
- num = int(num)
- else:
- num = SDK_GLOBAL_VARIABLES.get("sdk_uploaderr_maxcnt")
- return num
- def parse_riscv_arch(arch_str):
- """Parse RISC-V architecture string to standardized format"""
- if not arch_str:
- return None
- arch_str = arch_str.lower()
- if not arch_str.startswith('rv32') and not arch_str.startswith('rv64'):
- return None
- features = {
- 'xlen': arch_str[:4],
- 'base': '',
- 'exts': set()
- }
- # Parse standard ISA string
- std_isa = arch_str[4:].split('_')[0]
- for c in std_isa:
- if c in 'iemafdcbpkv':
- # don't add b k p into base architecture
- if c == 'b':
- # for nuclei b extension contains zba/zbb/zbc/zbs
- features['exts'].add('zba')
- features['exts'].add('zbb')
- features['exts'].add('zbc')
- features['exts'].add('zbs')
- elif c == 'k':
- # for nuclei k extension contains zba/zbb/zbc/zbs
- features['exts'].add('zk') # zk -> zkn zkr zkt
- features['exts'].add('zks') # zks -> zbkb-sc zbkc-sc zbkx-sc zksed zksh
- features['exts'].add('zkn') # zkn -> zbkb-sc zbkc-sc zbkx-sc zkne zknd zknh
- features['exts'].add('zkr')
- features['exts'].add('zkt')
- features['exts'].add('zkne')
- features['exts'].add('zknd')
- features['exts'].add('zknh')
- features['exts'].add('zksed')
- features['exts'].add('zksh')
- features['exts'].add('zbkb-sc')
- features['exts'].add('zbkc-sc')
- features['exts'].add('zbkx-sc')
- elif c == 'v':
- features['exts'].add('zve64d')
- features['exts'].add('zvl128b')
- features['base'] += 'v'
- elif c == 'p':
- features['exts'].add('xxldsp')
- else:
- features['base'] += c
- # when base architecture has i extension, then e extension is implied
- if 'i' in features['base']:
- features['base'] += 'e'
- # Parse extensions
- if '_' in arch_str:
- exts = arch_str.split('_')[1:]
- for ext in exts:
- ext = ext.strip()
- if ext == "":
- continue
- if ext in ('zvl128', 'zvl256', 'zvl512', 'zvl1024'):
- ext = ext + 'b'
- elif ext in ('zvb', 'zvk', 'zc'):
- ext = ext + '*'
- elif ext in ('dsp'):
- ext = 'xxl' + ext
- elif ext in ('dspn1', 'dspn2', 'dspn3'):
- ext = 'xxl' + ext + 'x'
- features['exts'].add(ext)
- # For nuclei zc* can also configured as c extension via mmisc_ctl csr ZCMT_ZCMP_EN bit
- if 'zc*' in features['exts']:
- features['base'] += 'c'
- # For nuclei cpu, zifencei and zicsr are implied
- features['exts'].add('zicsr')
- features['exts'].add('zifencei')
- # zve64d imply zve64f, zve64f imply zve64x and zve32f
- # zve64x imply zve32x, zve32f imply zve32x
- if 'zve64d' in features['exts']:
- features['exts'].add('zve64f')
- if 'zve64f' in features['exts']:
- features['exts'].add('zve32f')
- features['exts'].add('zve64x')
- if 'zve64x' in features['exts']:
- features['exts'].add('zve32x')
- if 'zve32f' in features['exts']:
- features['exts'].add('zve32x')
- if 'xxldspn3x' in features['exts']:
- features['exts'].add('xxldspn2x')
- if 'xxldspn2x' in features['exts']:
- features['exts'].add('xxldspn1x')
- if 'xxldspn1x' in features['exts']:
- features['exts'].add('xxldsp')
- if 'zvl1024b' in features['exts']:
- features['exts'].add('zvl512b')
- if 'zvl512b' in features['exts']:
- features['exts'].add('zvl256b')
- if 'zvl256b' in features['exts']:
- features['exts'].add('zvl128b')
- if 'zve64d' in features['exts'] and 'zvl128b' in features['exts']:
- features['base'] += 'v'
- return features
- def get_nuclei_sdk_root():
- sdk_root = os.environ.get("NUCLEI_SDK_ROOT")
- if not sdk_root:
- sdk_root = os.path.abspath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
- return sdk_root
- def parse_makefile_core():
- sdk_root = get_nuclei_sdk_root()
- makefile_core = os.path.join(sdk_root, "Build", "Makefile.core")
- core_archs = {}
- if not os.path.exists(makefile_core):
- return core_archs
- with open(makefile_core, 'r') as f:
- for line in f:
- line = line.strip()
- if not line or line.startswith('#'):
- continue
- if '_CORE_ARCH_ABI' in line:
- parts = line.split('=')
- if len(parts) == 2:
- core_name = parts[0].split('_CORE_ARCH_ABI')[0].lower()
- arch_parts = parts[1].strip().split()
- if len(arch_parts) >= 2:
- core_archs[core_name] = arch_parts[0]
- return core_archs
- def check_arch_compatibility(core_arch, arch_ext, supported_arch):
- """Check if core architecture with extensions is compatible with supported architecture"""
- if not supported_arch:
- return True
- supported = parse_riscv_arch(supported_arch)
- if not supported:
- return True
- # Combine core_arch with arch_ext
- full_arch = core_arch
- if arch_ext:
- full_arch += arch_ext
- current = parse_riscv_arch(full_arch)
- if not current:
- return False
- # Check XLEN compatibility
- if current['xlen'] != supported['xlen']:
- return False
- # Check base ISA compatibility
- for c in current['base']:
- if c not in supported['base']:
- return False
- # Check extensions compatibility
- # For current['exts'] containing extensions (no * suffix)
- # For supported['exts'] containing extensions (may have * suffix)
- # Extension matching should handle wildcards (*) in supported extensions
- for ext in current['exts']:
- found_match = False
- for supported_ext in supported['exts']:
- if supported_ext.endswith('*'):
- # Handle wildcard matching
- if ext.startswith(supported_ext[:-1]):
- found_match = True
- break
- elif ext == supported_ext:
- # Handle exact matching
- found_match = True
- break
- if not found_match:
- return False
- return True
- def filter_app_config(appconfig):
- """
- Filter application configurations based on architecture and extension compatibility.
- This function examines the build configuration of an application and determines if it should
- be filtered out based on architecture support and extension compatibility.
- Parameters:
- appconfig (dict): A dictionary containing application configuration.
- Expected to have a 'build_config' key with CORE, ARCH_EXT details.
- Returns:
- tuple: A pair of (bool, str) where:
- - bool: True if the configuration should be filtered out, False otherwise
- - str: A message explaining why the configuration was filtered (empty if not filtered)
- Environment Variables Used:
- - SDK_SUPPORT_ARCH: Supported architecture specifications
- - SDK_IGNORED_EXTS: Underscore-separated list of extensions to ignore
- Example:
- >>> config = {
- ... "build_config": {
- ... "CORE": "n307",
- ... "ARCH_EXT": "p_zfh"
- ... }
- ... }
- >>> filter_app_config(config)
- (False, "")
- Notes:
- - The function handles both single-letter and multi-letter extensions
- - Architecture extensions can be specified with or without leading underscore
- - Returns (False, "") if any required configuration is missing or in case of errors
- """
- if not isinstance(appconfig, dict):
- return False, ""
- try:
- build_config = appconfig.get("build_config", None)
- if build_config is None or len(build_config) == 0:
- return False, ""
- # Check SDK_SUPPORT_ARCH compatibility
- core = build_config.get("CORE", "").lower()
- arch_ext = build_config.get("ARCH_EXT", "")
- supported_arch = os.environ.get("SDK_SUPPORT_ARCH")
- if core and supported_arch:
- core_archs = parse_makefile_core()
- if core in core_archs:
- core_arch = core_archs[core]
- if not check_arch_compatibility(core_arch, arch_ext, supported_arch):
- return True, f"Core {core} with extensions {arch_ext} not supported by {supported_arch}"
- # Continue with existing extension filtering
- archext = build_config.get("ARCH_EXT", None)
- if archext is None or archext.strip() == "":
- return False, ""
- first_part = None
- rest_part = None
- if archext.startswith("_") == False:
- if "_" in archext:
- first_part, rest_part = archext.split("_", 1)
- else:
- if archext.startswith("z"):
- rest_part = archext
- else:
- first_part = archext
- else:
- rest_part = archext
- ignored_exts = os.environ.get("SDK_IGNORED_EXTS")
- if ignored_exts is None:
- return False, ""
- unique_exts = list(
- OrderedDict.fromkeys(part.strip() for part in ignored_exts.split('_'))
- )
- if len(unique_exts) == 1 and unique_exts[0] == "":
- return False, ""
- for ext in unique_exts:
- if len(ext) == 0:
- continue
- if len(ext) == 1:
- # handle single letter
- if first_part and ext in first_part:
- return True, "Filtered by %s extension" %(ext)
- else:
- # handle multi letter
- if rest_part and ext in rest_part:
- return True, "Filtered by %s extension" % (ext)
- except:
- pass
- return False, ""
- class NThread(Thread):
- def __init__(self, func, args):
- super(NThread, self).__init__()
- self.func = func
- self.args = args
- def run(self):
- self.result = self.func(*self.args)
- def get_result(self):
- try:
- return self.result
- except Exception:
- return None
- YAML_OK=0
- YAML_NOFILE=1
- YAML_INVAILD=2
- def load_yaml(file):
- if isinstance(file, str) == False or os.path.isfile(file) == False:
- return YAML_NOFILE, None
- try:
- data = yaml.load(open(file, 'r'), Loader=yaml.FullLoader)
- return YAML_OK, data
- except:
- print("Error: %s is an invalid yaml file!" % (file))
- return YAML_INVAILD, None
- def save_yaml(file, data):
- if isinstance(file, str) == False:
- return False
- try:
- with open(file, "w") as cf:
- yaml.dump(data, cf, indent=4)
- return True
- except:
- print("Error: Data can't be serialized to yaml file!")
- return False
- def get_specific_key_value(dictdata:dict, key):
- if not dictdata:
- print("Error: dictdata doesn't exist!")
- return None
- value = dictdata.get(key, None)
- if not value:
- print("Error, key %s has no value!" % (key))
- return None
- return value
- JSON_OK=0
- JSON_NOFILE=1
- JSON_INVAILD=2
- def load_json(file):
- if isinstance(file, str) == False or os.path.isfile(file) == False:
- return JSON_NOFILE, None
- try:
- data = json.load(open(file, 'r'))
- return JSON_OK, data
- except:
- print("Error: %s is an invalid json file!" % (file))
- return JSON_INVAILD, None
- def save_json(file, data):
- if isinstance(file, str) == False:
- return False
- try:
- with open(file, "w") as cf:
- json.dump(data, cf, indent=4)
- return True
- except:
- print("Error: Data can't be serialized to json file!")
- return False
- def save_csv(file, csvlines, display=True):
- if isinstance(csvlines, list) == False:
- return False
- # Flush stdout buffer
- sys.stdout.flush()
- try:
- with open(file, "w") as cf:
- for line in csvlines:
- csvline = line + "\n"
- cf.write(csvline)
- cf.flush()
- if display:
- try:
- # sometimes facing issue BlockingIOError: [Errno 11] write could not complete without blocking here
- # maybe related to https://bugs.python.org/issue40634 since we are using async in this tool
- sys.stdout.flush()
- print("CSV, %s" % line)
- except:
- pass
- return True
- except:
- print("Error: Data can't be saved to file!")
- return False
- # Return possible serports, return a list of possible serports
- def find_possible_serports():
- comports = serial.tools.list_ports.comports()
- serports = [ port.device for port in comports ]
- return serports
- def find_serport_by_no(serno):
- comports = serial.tools.list_ports.comports()
- serport = None
- for port in comports:
- cur_serno = port.serial_number
- cur_dev = port.device
- cur_loc = port.location
- if cur_serno is None:
- continue
- if sys.platform == "win32":
- if (serno + 'B') == cur_serno:
- serport = cur_dev
- break
- else:
- if serno != cur_serno:
- continue
- # serial is the second device of the composite device
- if cur_loc.endswith(".1"):
- serport = cur_dev
- break
- # serport founded
- return serport
- def find_most_possible_serport():
- serports = find_possible_serports()
- if len(serports) > 0:
- # sort the ports
- serports.sort()
- # get the biggest port
- # for /dev/ttyUSB0, /dev/ttyUSB1, get /dev/ttyUSB1
- # for COM16, COM17, get COM17
- return serports[-1]
- else:
- return None
- # get from https://gist.github.com/angstwad/bf22d1822c38a92ec0a9
- def dict_merge(dct, merge_dct):
- """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
- updating only top-level keys, dict_merge recurses down into dicts nested
- to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
- ``dct``.
- :param dct: dict onto which the merge is executed
- :param merge_dct: dct merged into dct
- :return: None
- """
- for k, v in merge_dct.items():
- if (k in dct and isinstance(dct[k], dict)
- and isinstance(merge_dct[k], Mapping)):
- dict_merge(dct[k], merge_dct[k])
- else:
- dct[k] = merge_dct[k]
- def get_make_csv(app, config):
- make_options = " "
- SUPPORT_KEYS = ["SOC", "BOARD", "CORE", "DOWNLOAD", "VARIANT", \
- "BENCH_UNIT", "BENCH_FLAGS", "ARCH_EXT", "STDCLIB", "SILENT", "V"]
- csv_print = "CSV, APP=%s" % (app)
- if isinstance(config, dict):
- for key in config:
- if key not in SUPPORT_KEYS:
- continue
- option = "%s=%s"%(key, config[key])
- make_options = " %s %s " % (make_options, option)
- csv_print = "%s, %s" % (csv_print, option)
- return make_options, csv_print
- def try_decode_bytes(bytes):
- ENCODING_LIST = ['utf-8', 'gbk', 'gb18030']
- destr = ""
- for encoding in ENCODING_LIST:
- try:
- destr = bytes.decode(encoding)
- break
- except:
- continue
- return destr
- def kill_async_subprocess(proc):
- startticks = time.time()
- if proc is not None:
- try:
- kill_sig = signal.SIGTERM
- if sys.platform != "win32":
- kill_sig = signal.SIGKILL
- print("Try to Kill process id %d now" %(proc.pid))
- parent_proc = psutil.Process(proc.pid)
- try:
- # This might cause PermissionError: [Errno 1] Operation not permitted: '/proc/1/stat' issue
- child_procs = parent_proc.children(recursive=True)
- for child_proc in child_procs:
- print("Kill child process %s, pid %d" %(child_proc.name(), child_proc.pid))
- try:
- os.kill(child_proc.pid, kill_sig) # kill child process
- except:
- continue
- except Exception as exc:
- print("Warning: kill child process failed with %s" %(exc))
- if parent_proc.is_running():
- print("Kill parent process %s, pid %d" %(parent_proc.name(), parent_proc.pid))
- if sys.platform != "win32":
- try:
- os.killpg(parent_proc.pid, kill_sig) # kill parent process
- except:
- os.kill(parent_proc.pid, kill_sig) # kill parent process
- else:
- os.kill(parent_proc.pid, kill_sig) # kill parent process
- # kill using process.kill again
- if parent_proc.is_running():
- proc.kill()
- except psutil.NoSuchProcess:
- pass
- except Exception as exc:
- print("Warning: kill process failed with %s" %(exc))
- # show time cost for kill process
- print("kill process used %.2f seconds" %((time.time() - startticks)))
- sys.stdout.flush()
- pass
- def kill_subprocess(proc):
- try:
- if proc.poll() is None: # process is still running
- kill_async_subprocess(proc)
- except:
- pass
- pass
- def import_module(module_name, file_path):
- if file_path is None or os.path.isfile(file_path) == False:
- return None
- try:
- spec = importlib.util.spec_from_file_location(module_name, file_path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- except:
- module = None
- return module
- def import_function(func_name, file_path):
- module_name = "tempmodule_%s" % (random.randint(0, 10000))
- tmpmodule = import_module(module_name, file_path)
- if tmpmodule is None:
- return None
- if func_name not in dir(tmpmodule):
- return None
- return getattr(tmpmodule, func_name)
- COMMAND_RUNOK=0
- COMMAND_INVALID=1
- COMMAND_FAIL=2
- COMMAND_INTERRUPTED=3
- COMMAND_EXCEPTION=4
- COMMAND_NOTAPP=5
- COMMAND_TIMEOUT=6
- COMMAND_TIMEOUT_READ=7
- RUNSTATUS_OK=0
- RUNSTATUS_FAIL=1
- RUNSTATUS_NOTSTART=2
- def run_command(command, show_output=True, logfile=None, append=False):
- logfh = None
- ret = COMMAND_RUNOK
- cmd_elapsed_ticks = 0
- if isinstance(command, str) == False:
- return COMMAND_INVALID, cmd_elapsed_ticks
- startticks = time.time()
- process = None
- try:
- if isinstance(logfile, str):
- if append:
- logfh = open(logfile, "ab")
- else:
- logfh = open(logfile, "wb")
- if logfh:
- # record command run in log file
- logfh.write(("Execute Command %s\n" % (command)).encode())
- process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, \
- stderr=subprocess.STDOUT)
- while True:
- line = process.stdout.readline()
- if (not line) and process.poll() is not None:
- break
- if show_output:
- print(try_decode_bytes(line), end="")
- if logfh:
- logfh.write(line)
- time.sleep(0.01)
- process.communicate(30)
- if process.returncode != 0:
- ret = COMMAND_FAIL
- except (KeyboardInterrupt):
- print("Key CTRL-C pressed, command executing stopped!")
- ret = COMMAND_INTERRUPTED
- except subprocess.TimeoutExpired:
- ret = COMMAND_TIMEOUT
- except Exception as exc:
- print("Unexpected exception happened: %s" %(str(exc)))
- ret = COMMAND_EXCEPTION
- finally:
- kill_subprocess(process)
- if process:
- del process
- if logfh:
- logfh.close()
- cmd_elapsed_ticks = time.time() - startticks
- return ret, cmd_elapsed_ticks
- async def run_cmd_and_check_async(command, timeout:int, checks:dict, checktime=time.time(), sdk_check=False, logfile=None, show_output=False, banner_timeout=3):
- logfh = None
- ret = COMMAND_FAIL
- cmd_elapsed_ticks = 0
- if isinstance(command, str) == False:
- return COMMAND_INVALID, cmd_elapsed_ticks
- startticks = time.time()
- process = None
- check_status = False
- pass_checks = checks.get("PASS", [])
- fail_checks = checks.get("FAIL", [])
- def test_in_check(string, checks):
- if type(checks) == list:
- for check in checks:
- if check in string:
- return True
- return False
- NSDK_CHECK_TAG = get_sdk_checktag()
- if get_sdk_verb_buildmsg():
- print("Checker used: ", checks)
- print("SDK Checker Tag \"%s\", checker enable %s" % (NSDK_CHECK_TAG, sdk_check))
- print("SDK run timeout %s, banner timeout %s" % (timeout, banner_timeout))
- check_finished = False
- start_time = time.time()
- serial_log = ""
- nsdk_check_timeout = banner_timeout
- sdk_checkstarttime = time.time()
- try:
- if isinstance(logfile, str):
- logfh = open(logfile, "wb")
- if sys.platform != "win32":
- # add exec to running command to avoid create a process called /bin/sh -c
- # and if you kill that process it will kill this sh process not the really
- # command process you want to kill
- process = await asyncio.create_subprocess_shell("exec " + command, \
- stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
- else:
- process = await asyncio.create_subprocess_shell(command, \
- stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
- while (time.time() - start_time) < timeout:
- try:
- linebytes = await asyncio.wait_for(process.stdout.readline(), 1)
- except asyncio.TimeoutError:
- if sdk_check == True:
- linebytes = None
- else:
- continue
- except KeyboardInterrupt:
- print("Key CTRL-C pressed, command executing stopped!")
- break
- except:
- break
- if linebytes:
- line = str(try_decode_bytes(linebytes)).replace('\r', '')
- else:
- line = ""
- if sdk_check == True:
- if (time.time() - sdk_checkstarttime) > nsdk_check_timeout:
- print("No SDK banner found in %s s, quit now!" % (nsdk_check_timeout))
- ret = COMMAND_TIMEOUT
- check_status = False
- break
- if line == "":
- continue
- if show_output:
- print("XXX Check " + line, end='')
- if NSDK_CHECK_TAG in line:
- timestr = line.split(NSDK_CHECK_TAG)[-1].strip()
- cur_time = time.mktime(time.strptime(timestr, "%b %d %Y, %H:%M:%S"))
- if int(cur_time) >= int(checktime):
- sdk_check = False
- line = NSDK_CHECK_TAG + " " + timestr + "\n"
- serial_log = serial_log + str(line)
- else:
- serial_log = serial_log + str(line)
- if show_output:
- print(line, end='')
- if check_finished == False:
- if test_in_check(line, fail_checks):
- check_status = False
- check_finished = True
- if test_in_check(line, pass_checks):
- check_status = True
- check_finished = True
- if check_finished:
- ret = COMMAND_RUNOK
- # record another 2 seconds by reset start_time and timeout to 2
- start_time = time.time()
- timeout = 1
- if logfh and linebytes:
- logfh.write(linebytes)
- time.sleep(0.01)
- except (KeyboardInterrupt):
- print("Key CTRL-C pressed, command executing stopped!")
- ret = COMMAND_INTERRUPTED
- except Exception as exc:
- print("Unexpected exception happened: %s" %(str(exc)))
- ret = COMMAND_EXCEPTION
- finally:
- # kill this process
- kill_async_subprocess(process)
- if logfh:
- logfh.close()
- cmd_elapsed_ticks = time.time() - startticks
- return check_status, cmd_elapsed_ticks
- def run_cmd_and_check(command, timeout:int, checks:dict, checktime=time.time(), sdk_check=False, logfile=None, show_output=False, banner_timeout=30):
- loop = asyncio.get_event_loop()
- try:
- ret, cmd_elapsed_ticks = loop.run_until_complete( \
- run_cmd_and_check_async(command, timeout, checks, checktime, sdk_check, logfile, show_output, banner_timeout))
- except KeyboardInterrupt:
- print("Key CTRL-C pressed, command executing stopped!")
- ret, cmd_elapsed_ticks = False, 0
- finally:
- if sys.platform != "win32":
- os.system("stty echo 2> /dev/null")
- return ret, cmd_elapsed_ticks
- def find_files(fndir, pattern, recursive=False):
- fndir = os.path.normpath(fndir)
- files = glob.glob(os.path.join(fndir, pattern), recursive=recursive)
- return files
- def get_logfile(appdir, startdir, logdir, logname):
- relpath = os.path.relpath(appdir, startdir)
- _, startdir_basename = os.path.splitdrive(startdir)
- applogdir = os.path.join(os.path.relpath(logdir + os.sep + startdir_basename), relpath)
- applog = os.path.relpath(os.path.join(applogdir, logname))
- applogdir = os.path.dirname(applog)
- if os.path.isdir(applogdir) == False:
- os.makedirs(applogdir)
- return applog
- def strtofloat(value):
- fval = 0.0
- try:
- match = re.search(r'[+-]?\d*\.?\d+([Ee][+-]?\d+)?', value.strip())
- if match:
- fval = float(match.group())
- except:
- pass
- return fval
- def check_tool_version(ver_cmd, ver_check):
- vercmd_log = tempfile.mktemp()
- ret, _ = run_command(ver_cmd, show_output=False, logfile=vercmd_log)
- check_sts = False
- verstr = None
- if ret == COMMAND_RUNOK:
- with open(vercmd_log, 'r', errors='ignore') as vlf:
- for line in vlf.readlines():
- if ver_check in line:
- verstr = line.strip()
- check_sts = True
- break
- os.remove(vercmd_log)
- return check_sts, verstr
- def get_elfsize(elf):
- sizeinfo = {"text": -1, "data": -1, "bss": -1, "total": -1}
- if os.path.isfile(elf) == False:
- return sizeinfo
- for sizetool in [ "riscv-nuclei-elf-size", "riscv64-unknown-elf-size", "size" ]:
- sizecmd = "%s %s" % (sizetool, elf)
- sizelog = tempfile.mktemp()
- ret, _ = run_command(sizecmd, show_output=False, logfile=sizelog)
- if ret == COMMAND_RUNOK:
- with open(sizelog, "r", errors='ignore') as sf:
- lines = sf.readlines()
- datas = lines[-1].strip().split()
- sizeinfo["text"] = int(datas[0])
- sizeinfo["data"] = int(datas[1])
- sizeinfo["bss"] = int(datas[2])
- sizeinfo["total"] = int(datas[3])
- os.remove(sizelog)
- break
- else:
- os.remove(sizelog)
- return sizeinfo
- def merge_config_with_makeopts(config, make_options):
- opt_splits=make_options.strip().split()
- passed_buildcfg = dict()
- for opt in opt_splits:
- if "=" in opt:
- values = opt.split("=")
- # Make new build config
- if (len(values) == 2):
- passed_buildcfg[values[0]] = values[1]
- build_cfg = config.get("build_config", None)
- if build_cfg is None:
- config["build_config"] = passed_buildcfg
- else:
- # update build_config using parsed config via values specified in make_options
- config["build_config"].update(passed_buildcfg)
- return config
- # merge config dict and args dict
- # args will overwrite config
- def merge_config_with_args(config, args_dict):
- if isinstance(config, dict) == False:
- return None
- if isinstance(args_dict, dict) == False:
- return config
- serport = args_dict.get("serport", None)
- baudrate = args_dict.get("baudrate", None)
- make_options = args_dict.get("make_options", None)
- parallel = args_dict.get("parallel", None)
- build_target = args_dict.get("build_target", None)
- run_target = args_dict.get("run_target", None)
- timeout = args_dict.get("timeout", None)
- ncycm = args_dict.get("ncycm", None)
- if isinstance(config, dict) == False:
- return None
- new_config = copy.deepcopy(config)
- if serport or baudrate or run_target:
- run_cfg = new_config.get("run_config", None)
- if run_cfg is None:
- new_config["run_config"] = {"hardware":{}}
- elif "hardware" not in run_cfg:
- new_config["run_config"]["hardware"] = {}
- if serport:
- new_config["run_config"]["hardware"]["serport"] = str(serport)
- if baudrate:
- new_config["run_config"]["hardware"]["serport"] = int(baudrate)
- if run_target:
- new_config["run_config"]["target"] = str(run_target)
- run_target = new_config["run_config"].get("target", "hardware")
- if run_target not in new_config["run_config"]:
- new_config["run_config"][run_target] = dict()
- if ncycm:
- if "ncycm" not in new_config["run_config"]:
- new_config["run_config"]["ncycm"] = dict()
- new_config["run_config"]["ncycm"]["ncycm"] = os.path.abspath(ncycm)
- if timeout: # set timeout
- try:
- timeout = int(timeout)
- except:
- timeout = 60
- new_config["run_config"][run_target]["timeout"] = timeout
- if build_target is not None:
- new_config["build_target"] = build_target
- if parallel is not None:
- new_config["parallel"] = parallel
- if make_options:
- new_config = merge_config_with_makeopts(new_config, make_options)
- return new_config
- # merge two config, now is appcfg, another is hwcfg
- # hwcfg will overwrite configuration in appcfg
- def merge_two_config(appcfg, hwcfg):
- if isinstance(appcfg, dict) == True and isinstance(hwcfg, dict) == False:
- return appcfg
- if isinstance(appcfg, dict) == False and isinstance(hwcfg, dict) == True:
- return hwcfg
- merged_appcfg = copy.deepcopy(appcfg)
- dict_merge(merged_appcfg, hwcfg)
- return merged_appcfg
- def set_global_variables(config):
- global SDK_GLOBAL_VARIABLES
- if isinstance(config, dict) == False:
- return False
- if "global_variables" in config:
- dict_merge(SDK_GLOBAL_VARIABLES, config["global_variables"])
- print("Using global variables: %s" % SDK_GLOBAL_VARIABLES)
- return True
- def get_app_runresult(apprst):
- if not isinstance(apprst, dict):
- return "unknown", "-"
- if "type" not in apprst:
- return "unknown", "-"
- rsttype = apprst["type"]
- rstvaluedict = apprst.get("value", dict())
- if rstvaluedict and len(rstvaluedict) < 3:
- rstval = ""
- for key in rstvaluedict:
- rstval += "%s : %s;" %(key, rstvaluedict[key])
- rstval = rstval.rstrip(';')
- else:
- rstval = "-"
- return rsttype, rstval
- def save_execute_csv(result, csvfile):
- if isinstance(result, dict) == False:
- return False
- csvlines = ["App, buildstatus, runstatus, buildtime, runtime, type, value, total, text, data, bss"]
- for app in result:
- size = result[app]["size"]
- app_status = result[app]["status"]
- app_time = result[app]["time"]
- apprsttype, apprstval = get_app_runresult(result[app].get("result", dict()))
- csvline ="%s, %s, %s, %s, %s, %s, %s, %d, %d, %d, %d" % (app, app_status["build"], \
- app_status.get("run", False), app_time.get("build", "-"), app_time.get("run", "-"), \
- apprsttype, apprstval, size["total"], size["text"], size["data"], size["bss"])
- csvlines.append(csvline)
- display = get_sdk_verb_buildmsg()
- save_csv(csvfile, csvlines, display)
- return True
- def save_bench_csv(result, csvfile):
- if isinstance(result, dict) == False:
- return False
- csvlines = ["App, case, buildstatus, runstatus, buildtime, runtime, type, value, total, text, data, bss"]
- for app in result:
- appresult = result[app]
- for case in appresult:
- size = appresult[case]["size"]
- app_status = appresult[case]["status"]
- app_time = appresult[case]["time"]
- apprsttype, apprstval = get_app_runresult(appresult[case].get("result", dict()))
- csvline = "%s, %s, %s, %s, %s, %s, %s, %s, %d, %d, %d, %d" % (app, case, app_status["build"], \
- app_status.get("run", False), app_time.get("build", "-"), app_time.get("run", "-"), \
- apprsttype, apprstval, size["total"], size["text"], size["data"], size["bss"])
- csvlines.append(csvline)
- # save csv file
- display = get_sdk_verb_buildmsg()
- save_csv(csvfile, csvlines, display)
- return True
- def find_local_appconfig(appdir, localcfgs):
- if isinstance(appdir, str) and isinstance(localcfgs, dict):
- if appdir in localcfgs:
- return appdir
- else:
- foundcfg = None
- for localcfg in localcfgs:
- localcfgtp = localcfg.strip('/')
- striped_dir = appdir.split(localcfgtp, 1)
- if len(striped_dir) == 2:
- striped_dir = striped_dir[1]
- else:
- striped_dir = appdir
- if striped_dir != appdir:
- if striped_dir.startswith('/'):
- if foundcfg is None:
- foundcfg = localcfg
- else:
- if len(foundcfg) < len(localcfg):
- foundcfg = localcfg
- return foundcfg
- else:
- return None
- def fix_evalsoc_verilog_ncycm(verilog):
- if os.path.isfile(verilog) == False:
- return ""
- vfct = ""
- with open(verilog, "r", errors='ignore') as vf:
- for line in vf.readlines():
- line = line.replace("@80", "@00").replace("@90", "@08")
- vfct += line
- verilog_new = verilog + ".ncycm"
- with open(verilog_new, "w") as vf:
- vf.write(vfct)
- return verilog_new
- PROGRAM_UNKNOWN="unknown"
- PROGRAM_BAREBENCH="barebench"
- PROGRAM_COREMARK="coremark"
- PROGRAM_DHRYSTONE="dhrystone"
- PROGRAM_WHETSTONE="whetstone"
- def parse_benchmark_compatiable(lines):
- result = None
- program_type = PROGRAM_UNKNOWN
- subtype = PROGRAM_UNKNOWN
- try:
- for line in lines:
- # Coremark
- if "CoreMark" in line:
- program_type = PROGRAM_BAREBENCH
- subtype = PROGRAM_COREMARK
- if "Iterations*1000000/total_ticks" in line:
- value = line.split("=")[1].strip().split()[0]
- result = dict()
- result["CoreMark/MHz"] = strtofloat(value)
- # Dhrystone
- if "Dhrystone" in line:
- program_type = PROGRAM_BAREBENCH
- subtype = PROGRAM_DHRYSTONE
- if "1000000/(User_Cycle/Number_Of_Runs)" in line:
- value = line.split("=")[1].strip().split()[0]
- result = dict()
- result["DMIPS/MHz"] = strtofloat(value)
- # Whetstone
- if "Whetstone" in line:
- program_type = PROGRAM_BAREBENCH
- subtype = PROGRAM_WHETSTONE
- if "MWIPS/MHz" in line:
- value = line.split("MWIPS/MHz")[-1].strip().split()[0]
- result = dict()
- result["MWIPS/MHz"] = strtofloat(value)
- except:
- return program_type, subtype, result
- return program_type, subtype, result
- def parse_benchmark_baremetal(lines):
- result = None
- program_type = PROGRAM_UNKNOWN
- subtype = PROGRAM_UNKNOWN
- try:
- unit = "unknown"
- for line in lines:
- stripline = line.strip()
- if "csv," in stripline.lower():
- csv_values = stripline.split(',')
- if len(csv_values) >= 3:
- key = csv_values[1].strip()
- value = csv_values[-1].strip()
- if key.lower() == "benchmark":
- program_type = PROGRAM_BAREBENCH
- unit = value
- else:
- subtype = key.lower()
- result = dict()
- result[unit] = strtofloat(value)
- break
- except:
- return program_type, subtype, result
- return program_type, subtype, result
- def parse_benchmark_baremetal_csv(lines):
- result = None
- program_type = PROGRAM_UNKNOWN
- try:
- result = dict()
- for line in lines:
- stripline = line.strip()
- if "csv," in stripline.lower():
- csv_values = stripline.split(',')
- if len(csv_values) >= 3:
- key = csv_values[1].strip()
- value = csv_values[-1].strip()
- if "BENCH" not in key.upper():
- result[key] = value
- except:
- return program_type, result
- return program_type, result
- def find_index(key, arr):
- try:
- index = arr.index(key)
- except:
- index = -1
- return index
- def parse_benchmark_runlog(lines, lgf=""):
- if isinstance(lines, list) == False:
- return PROGRAM_UNKNOWN, PROGRAM_UNKNOWN, None
- if len(lines) == 0:
- return PROGRAM_UNKNOWN, PROGRAM_UNKNOWN, None
- subtype = ""
- if lgf.strip() == "": # old style
- program_type, subtype, result = parse_benchmark_compatiable(lines)
- else:
- lgf = lgf.replace("\\", "/")
- appnormdirs = os.path.dirname(os.path.normpath(lgf)).replace('\\', '/').split('/')
- if "baremetal/benchmark" in lgf:
- # baremetal benchmark
- program_type, subtype, result = parse_benchmark_baremetal(lines)
- if program_type == PROGRAM_UNKNOWN:
- # fallback to previous parser
- program_type, subtype, result = parse_benchmark_compatiable(lines)
- elif "baremetal/demo_dsp" in lgf:
- program_type, result = parse_benchmark_baremetal_csv(lines)
- program_type = "demo_dsp"
- elif "DSP/Examples/RISCV" in lgf:
- program_type, result = parse_benchmark_baremetal_csv(lines)
- program_type = "nmsis_dsp_example"
- index = find_index("RISCV", appnormdirs)
- if index >= 0:
- subtype = appnormdirs[index + 1]
- elif "DSP/Test" in lgf:
- program_type, result = parse_benchmark_baremetal_csv(lines)
- program_type = "nmsis_dsp_tests"
- index = find_index("Test", appnormdirs)
- if index >= 0:
- subtype = appnormdirs[index + 1]
- elif "NN/Examples/RISCV" in lgf:
- program_type, result = parse_benchmark_baremetal_csv(lines)
- program_type = "nmsis_nn_example"
- index = find_index("RISCV", appnormdirs)
- if index >= 0:
- subtype = appnormdirs[index + 1]
- elif "NN/Tests" in lgf:
- program_type, result = parse_benchmark_baremetal_csv(lines)
- if "full" in appnormdirs:
- program_type = "nmsis_nn_test_full"
- subtype = "full"
- else:
- program_type = "nmsis_nn_test_percase"
- index = find_index("percase", appnormdirs)
- if index >= 0:
- subtype = appnormdirs[index + 1]
- else:
- program_type, subtype, result = parse_benchmark_compatiable(lines)
- return program_type, subtype, result
- def parse_benchmark_use_pyscript(lines, lgf, pyscript):
- if isinstance(lines, list) == False:
- return PROGRAM_UNKNOWN, PROGRAM_UNKNOWN, None
- if len(lines) == 0:
- return PROGRAM_UNKNOWN, PROGRAM_UNKNOWN, None
- # function should named parse_benchmark
- # function argument and return like parse_benchmark_runlog
- parsefunc = import_function("parse_benchmark", pyscript)
- if parsefunc is None:
- return PROGRAM_UNKNOWN, PROGRAM_UNKNOWN, None
- try:
- program_type, subtype, result = parsefunc(lines, lgf)
- return program_type, subtype, result
- except Exception as exc:
- print("ERROR: Parse using %s script error: %s" %(pyscript, exc))
- return PROGRAM_UNKNOWN, PROGRAM_UNKNOWN, None
- def check_tool_exist(tool):
- exist = False
- if sys.platform == 'win32':
- if os.system("where %s" % (tool)) == 0:
- exist = True
- else:
- if os.system("which %s" % (tool)) == 0:
- exist = True
- return exist
- def find_vivado_cmd():
- for vivado_cmd in ("vivado", "vivado_lab"):
- if sys.platform == 'win32':
- if os.system("where %s" % (vivado_cmd)) == 0:
- return vivado_cmd
- else:
- if os.system("which %s" % (vivado_cmd)) == 0:
- return vivado_cmd
- return None
- def datetime_now():
- return datetime.datetime.now().strftime(DATE_FORMATE)
- def program_fpga(bit, target):
- if os.path.isfile(bit) == False:
- print("Can't find bitstream in %s" % (bit))
- return False
- print("Try to program fpga bitstream %s to target board %s" % (bit, target))
- sys.stdout.flush()
- FILE_LOCK = os.path.join(get_tmpdir(), FILE_LOCK_NAME)
- # TODO: use portable filelock for win32
- with open(FILE_LOCK, 'w+') as filelock:
- if sys.platform != "win32":
- print("%s, Wait another board's programing fpga to finished" %(datetime_now()))
- fcntl.flock(filelock, fcntl.LOCK_EX)
- # set to 666, in case that other user can't access this file causing exception
- if os.stat(FILE_LOCK).st_uid == os.getuid():
- os.chmod(FILE_LOCK, stat.S_IWGRP | stat.S_IRGRP | stat.S_IWUSR | stat.S_IRUSR | stat.S_IWOTH | stat.S_IROTH)
- print("%s, Has acquired the chance to do fpga programing!" %(datetime_now()))
- vivado_cmd = find_vivado_cmd()
- # check vivado is found or not
- if vivado_cmd == None:
- print("vivado is not found in PATH, please check!")
- return False
- tcl = os.path.join(os.path.dirname(os.path.realpath(__file__)), "program_bit.tcl")
- target = "*%s" % (target)
- progcmd = "%s -mode batch -nolog -nojournal -source %s -tclargs %s %s" % (vivado_cmd, tcl, bit, target)
- tmout = get_sdk_fpga_prog_tmout()
- if sys.platform != 'win32' and tmout is not None and tmout.strip() != "":
- print("Timeout %s do fpga program" % (tmout))
- progcmd = "timeout --foreground -s SIGKILL %s %s" % (tmout, progcmd)
- print("Do fpga program using command: %s" % (progcmd))
- sys.stdout.flush()
- ret = os.system(progcmd)
- sys.stdout.flush()
- if ret != 0:
- print("Program fpga bit failed, error code %d" % ret)
- return False
- print("Program fpga bit successfully")
- return True
- def find_fpgas():
- vivado_cmd = find_vivado_cmd()
- if vivado_cmd == None:
- print("vivado is not found in PATH, please check!")
- return dict()
- tcl = os.path.join(os.path.dirname(os.path.realpath(__file__)), "find_devices.tcl")
- sys.stdout.flush()
- tmp_log = tempfile.mktemp()
- os.system("%s -mode batch -nolog -nojournal -source %s -notrace > %s" % (vivado_cmd, tcl, tmp_log))
- sys.stdout.flush()
- fpgadevices = dict()
- with open(tmp_log, "r", errors='ignore') as tf:
- for line in tf.readlines():
- line = line.strip()
- if line.startswith("CSV,") == False:
- continue
- splits = line.split(",")
- if len(splits) != 3:
- continue
- fpga_serial = "/".join(splits[1].split("/")[2:])
- fpgadevices[fpga_serial] = splits[2].strip()
- return fpgadevices
- def check_serial_port(serport):
- if serport in find_possible_serports():
- return True
- return False
- def modify_openocd_cfg(cfg, ftdi_serial):
- cfg_bk = cfg + ".backup"
- if (os.path.isfile(cfg)) == False:
- return False
- if os.path.isfile(cfg_bk) == True:
- print("Restore openocd cfg %s" %(cfg))
- shutil.copyfile(cfg_bk, cfg)
- else:
- print("Backup openocd cfg %s" %(cfg))
- shutil.copyfile(cfg, cfg_bk)
- found = False
- contents = []
- index = 0
- with open(cfg, 'r', errors='ignore') as cf:
- contents = cf.readlines()
- for line in contents:
- if line.strip().startswith("transport select"):
- found = True
- break
- index += 1
- if found == False:
- return False
- if sys.platform == 'win32':
- ftdi_serial = "%sA" % (ftdi_serial)
- contents.insert(index, "ftdi_serial %s\ntcl_port disabled\ntelnet_port disabled\n" %(ftdi_serial))
- with open(cfg, 'w') as cf:
- contents = "".join(contents)
- cf.write(contents)
- return True
- GL_CPUCFGs = os.path.join(SCRIPT_DIR, "configs", "cpu")
- def gen_runcfg(cpucfg, runcfg, buildconfig=dict()):
- _, cpucfgdict = load_json(cpucfg)
- _, runcfgdict = load_json(runcfg)
- if cpucfgdict is None:
- return { "build_configs": { "default": {} } }
- if runcfgdict is None:
- return cpucfgdict
- matrixcfgs = runcfgdict.get("matrix", None)
- expectedcfg = runcfgdict.get("expected", dict())
- expectedscfg = runcfgdict.get("expecteds", dict())
- finalruncfg = copy.deepcopy(cpucfgdict)
- # merge buildconfig
- finalruncfg["build_config"] = merge_two_config(finalruncfg.get("build_config", dict()), buildconfig)
- finalruncfg["expected"] = merge_two_config(finalruncfg.get("expected", dict()), expectedcfg)
- finalruncfg["expecteds"] = merge_two_config(finalruncfg.get("expecteds", dict()), expectedscfg)
- if matrixcfgs is None:
- return finalruncfg
- bcfgs = cpucfgdict.get("build_configs", dict())
- newbcfgs = dict()
- for bkey in bcfgs:
- for key in matrixcfgs:
- cfgkey = "%s-%s" % (bkey, key)
- newbcfgs[cfgkey] = merge_two_config(bcfgs[bkey], matrixcfgs[key])
- if len(newbcfgs) > 1:
- finalruncfg["build_configs"] = newbcfgs
- else:
- finalruncfg["build_configs"] = bcfgs
- return finalruncfg
- def gen_coreruncfg(core, runcfg, choice="mini", buildconfig=dict(), casedir=None):
- cpucfgsloc = os.path.join(GL_CPUCFGs, choice)
- if casedir is not None:
- tmp = os.path.join(casedir, choice)
- if os.path.isdir(tmp) == True:
- cpucfgsloc = os.path.realpath(tmp)
- print("Use cpu configs in location %s directory" % (cpucfgsloc))
- cpucfg = os.path.join(cpucfgsloc, "%s.json" % (core))
- return gen_runcfg(cpucfg, runcfg, buildconfig)
- def gen_coreruncfg_custom(core, runcfg, customcfgdir, buildconfig=dict()):
- cpucfg = os.path.join(customcfgdir, "%s.json" % (core))
- return gen_runcfg(cpucfg, runcfg, buildconfig)
- def gen_runyaml(core, locs, fpga_serial, ftdi_serial, cycm, fpgabit, boardtype, ocdcfg, appcfg, hwcfg):
- runyaml = { "runcfg": {"runner": "fpga"},
- "fpga_runners": { core: {
- "board_type": boardtype, "fpga_serial": fpga_serial,
- "ftdi_serial": ftdi_serial, "serial_port": ""}
- },
- "ncycm_runners": { core: {
- "model": cycm if cycm else "" }
- },
- "configs": { core: {
- "fpga": boardtype, "bitstream": fpgabit,
- "ncycm": core, "openocd_cfg": ocdcfg,
- "appcfg": appcfg, "hwcfg": hwcfg }
- },
- "environment": {
- "fpgaloc": locs.get("fpgaloc", ""),
- "ncycmloc": locs.get("ncycmloc", ""),
- "cfgloc": locs.get("cfgloc", "")
- }
- }
- if cycm is not None:
- runyaml["runcfg"]["runner"] = "ncycm"
- return runyaml
|