# # Copyright (c) 2021 Project CHIP Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Collect information from various sources into Memory Map DataFrames.""" import bisect from typing import Callable, Dict, List, Mapping, Optional, Sequence, Tuple import memdf.collector.bloaty import memdf.collector.csv import memdf.collector.elftools import memdf.collector.readelf import memdf.collector.su import memdf.name import memdf.select import memdf.util.config import pandas as pd # type: ignore from elftools.elf.constants import SH_FLAGS # type: ignore from memdf import DF, Config, ConfigDescription, DFs, ExtentDF, SectionDF, SymbolDF from memdf.collector.util import simplify_source PREFIX_CONFIG: ConfigDescription = { 'collect.prefix': { 'help': 'Strip PATH from the beginning of source file names', 'metavar': 'PATH', 'default': [], 'argparse': { 'alias': ['--prefix', '--strip-prefix'], 'action': 'append', } }, } CONFIG: ConfigDescription = { Config.group_def('input'): { 'title': 'input options', }, Config.group_def('tool'): { 'title': 'external tool options', }, Config.group_map('collect'): { 'group': 'input' }, **memdf.collector.bloaty.CONFIG, **memdf.collector.csv.CONFIG, **memdf.collector.elftools.CONFIG, **memdf.collector.readelf.CONFIG, 'collect.method': { 'help': 'Method of input processing: one of' ' elftools, readelf, bloaty, csv, tsv, su.', 'metavar': 'METHOD', 'choices': ['elftools', 'readelf', 'bloaty', 'csv', 'tsv', 'su'], 'default': 'elftools', 'argparse': { 'alias': ['-f'], }, }, **PREFIX_CONFIG, } ARM_SPECIAL_SYMBOLS = frozenset(["$a", "$t", "$t.x", "$d", "$d.realdata"]) def postprocess_symbols(config: Config, symbols: SymbolDF) -> SymbolDF: """Postprocess a symbol table after collecting from one source. If the symbol table contains FILE symbols, they will be removed and replaced by a 'file' column on other symbols. If the symbol table contains ARM mode symbols, they will be removed and replaced by an 'arm' column on other symbols. """ files = [] arms = [] arm_symbols = {} current_file = '' current_arm = '' has_file = False if config['collect.prefix-file']: prefixes = config.get_re('collect.prefix') else: prefixes = None if 'type' in symbols.columns: for symbol in symbols.itertuples(): if symbol.type == 'FILE': has_file = True current_file = symbol.symbol if prefixes: current_file = simplify_source(current_file, prefixes) elif symbol.type == 'NOTYPE': if symbol.symbol.startswith('$'): if current_arm or symbol.symbol in ARM_SPECIAL_SYMBOLS: current_arm = symbol.symbol arm_symbols[current_arm] = True files.append(current_file) arms.append(current_arm) if has_file: symbols['file'] = files if current_arm: symbols['arm'] = arms if has_file: symbols = symbols[symbols['type'] != 'FILE'] if current_arm: syms = arm_symbols.keys() symbols = symbols[~symbols.symbol.isin(syms)] return symbols def postprocess_file(config: Config, dfs: DFs) -> None: """Postprocess tables after collecting from one source.""" if SymbolDF.name in dfs: dfs[SymbolDF.name] = postprocess_symbols(config, dfs[SymbolDF.name]) def fill_holes(config: Config, symbols: SymbolDF, sections: SectionDF) -> DFs: """Account for space not used by any symbol, or by multiple symbols.""" # These symbols mark the start or end of unused space. start_unused = frozenset(config.get('symbol.free.start', [])) end_unused = frozenset(config.get('symbol.free.end', [])) extent_columns = ['address', 'size', 'section', 'file'] need_cu = 'cu' in symbols.columns if need_cu: extent_columns.append('cu') need_input = 'input' in symbols.columns if need_input: extent_columns.append('input') columns = ['symbol', *extent_columns, 'type', 'bind'] def filler(name, address, size, previous, current) -> List: row = [ name, # symbol address, # address size, # size (previous.section if previous else current.section if current else memdf.name.UNDEF), # section (previous.file if previous else current.file if current else ''), # file ] if need_cu: row.append( previous.cu if previous else current.cu if current else '') if need_input: row.append(previous.input if previous else current. input if current else '') row.append('NOTYPE') # type row.append('LOCAL') # bind return row def fill_gap(previous, current, from_address, to_address) -> Tuple[str, List]: """Add a row for a unaccounted gap or unused space.""" size = to_address - from_address if (previous is None or previous.symbol in start_unused or current.symbol in end_unused): use = 'unused' name = memdf.name.unused(from_address, size) else: use = 'gap' name = memdf.name.gap(from_address, size) return (use, filler(name, from_address, size, previous, current)) def fill_overlap(previous, current, from_address, to_address) -> Tuple[str, List]: """Add a row for overlap.""" size = to_address - from_address return ('overlap', filler(memdf.name.overlap(from_address, -size), from_address, size, previous, current)) # Find the address range for sections that are configured or allocated. config_sections = set() for _, s in config.get('region.sections', {}).items(): config_sections |= set(s) section_to_range = {} start_to_section = {} section_starts = [0] for s in sections.itertuples(): if ((s.section in config_sections) or (s.flags & SH_FLAGS.SHF_ALLOC)): section_to_range[s.section] = range(s.address, s.address + s.size) start_to_section[s.address] = s.section section_starts.append(s.address) section_starts.sort() new_symbols: Dict[str, List[list]] = { 'gap': [], 'unused': [], 'overlap': [] } section_range = None previous_symbol = None current_address = 0 iterable_symbols = symbols.loc[(symbols.type != 'SECTION') & (symbols.type != 'FILE') & symbols.section.isin(section_to_range)] iterable_symbols = iterable_symbols.sort_values(by='address') for symbol in iterable_symbols.itertuples(): if not previous_symbol or symbol.section != previous_symbol.section: # We sometimes see symbols that have the value of their section end # address (so they are not actually within the section) and have # the same address as a symbol in the next section. symbol_address_section = start_to_section.get(section_starts[ bisect.bisect_right(section_starts, symbol.address) - 1]) if symbol_address_section != symbol.section: continue # Starting or switching sections. if previous_symbol and section_range: # previous_symbol is the last in its section. if current_address < section_range[-1] + 1: use, row = fill_gap(previous_symbol, previous_symbol, current_address, section_range[-1] + 1) new_symbols[use].append(row) # Start of section. previous_symbol = None section_range = section_to_range.get(symbol.section) if section_range: current_address = section_range[0] if section_range: if current_address < symbol.address: use, row = fill_gap(previous_symbol, symbol, current_address, symbol.address) new_symbols[use].append(row) elif current_address > symbol.address: use, row = fill_overlap(previous_symbol, symbol, current_address, symbol.address) new_symbols[use].append(row) current_address = symbol.address + symbol.size previous_symbol = symbol dfs = {k: SymbolDF(new_symbols[k], columns=columns) for k in new_symbols} symbols = pd.concat([symbols, *dfs.values()]).fillna('') symbols.sort_values(by='address', inplace=True) for k in dfs: dfs[k] = ExtentDF(dfs[k][extent_columns]) dfs[k].attrs['name'] = k dfs[SymbolDF.name] = SymbolDF(symbols) return dfs def postprocess_collected(config: Config, dfs: DFs) -> None: """Postprocess tables after reading all sources.""" # Prune tables according to configuration options. This happens before # fill_holes() so that space of any pruned symbols will be accounted for, # and to avoid unnecessary work for pruned sections. for c in [SymbolDF, SectionDF]: if c.name in dfs: dfs[c.name] = memdf.select.select_configured( config, dfs[c.name], memdf.select.COLLECTED_CHOICES) # Account for space not used by any symbol, or by multiple symbols. if (SymbolDF.name in dfs and SectionDF.name in dfs and config.get('args.fill_holes', True)): dfs.update(fill_holes(config, dfs[SymbolDF.name], dfs[SectionDF.name])) # Create synthetic columns (e.g. 'region') and prune tables # according to their configuration. This happens after fill_holes() # so that synthetic column values will be created for the gap symbols. for c in [SymbolDF, SectionDF]: if c.name in dfs: for column in memdf.select.SYNTHETIC_CHOICES: dfs[c.name] = memdf.select.synthesize_column( config, dfs[c.name], column) dfs[c.name] = memdf.select.select_configured_column( config, dfs[c.name], column) for df in dfs.values(): if demangle := set((c for c in df.columns if c.endswith('symbol'))): df.attrs['demangle'] = demangle if hexify := set((c for c in df.columns if c.endswith('address'))): df.attrs['hexify'] = hexify FileReader = Callable[[Config, str, str], DFs] FILE_READERS: Dict[str, FileReader] = { 'bloaty': memdf.collector.bloaty.read_file, 'elftools': memdf.collector.elftools.read_file, 'readelf': memdf.collector.readelf.read_file, 'csv': memdf.collector.csv.read_file, 'tsv': memdf.collector.csv.read_file, 'su': memdf.collector.su.read_dir, } def collect_files(config: Config, files: Optional[List[str]] = None, method: Optional[str] = None) -> DFs: """Read a filtered memory map from a set of files.""" filenames = files if files else config.get('args.inputs', []) if method is None: method = config.get('collect.method', 'csv') frames: Dict[str, List[DF]] = {} for filename in filenames: dfs: DFs = FILE_READERS[method](config, filename, method) postprocess_file(config, dfs) for k, frame in dfs.items(): if k not in frames: frames[k] = [] frames[k].append(frame) dfs = {} for k, v in frames.items(): dfs[k] = pd.concat(v, ignore_index=True) postprocess_collected(config, dfs) return dfs def parse_args(config_desc: Mapping, argv: Sequence[str]) -> Config: """Common argument parsing for collection tools.""" config = Config().init({ **memdf.util.config.CONFIG, **CONFIG, **config_desc }) config.argparse.add_argument('inputs', metavar='FILE', nargs='+') return config.parse(argv)