# # Copyright (c) 2021 Project CHIP Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Data frame selection utilities.""" from typing import Mapping, Optional import memdf.name import memdf.util.config import memdf.util.pretty import numpy as np # type: ignore from memdf import DF, Config, ConfigDescription def split_size(config: Config, key: str) -> None: """Split a name:size configuration value. When a program supports a size threshold for selection or summary, this can be specificed for a particular item with a suffix on the configuration, e.g. `--section=.text:16K`. Given a configuration key `col.select` referring to such a list of arguments, this function strips any sizes from those arguments and stores them as a name:size dictionary in `col.limit`. """ src = key.split('.') dst = src[:-1] + ['limit'] splits = [s.split(':') for s in config.getl(src, [])] config.putl(src, [x[0] for x in splits]) config.putl(dst, { x[0]: memdf.util.config.parse_size(x[1]) for x in splits if len(x) > 1 }) def get_limit(config: Config, column: str, name: str) -> int: return config.getl([column, 'limit', name], config.get('report.limit', 0)) def postprocess_selections(config: Config, key: str, info: Mapping) -> None: """Resolve select/ignore command options.""" split_size(config, key) choice, select = key.split('.') assert select == 'select' selections = config.get(key) if not config.getl([choice, 'ignore-all'], False): if defaults := config.getl([choice, 'default']): for i in config.getl([choice, 'ignore']): if i in defaults: defaults.remove(i) selections += defaults config.put(key, frozenset(selections)) def select_and_ignore_config_desc(key: str) -> ConfigDescription: return { Config.group_map(key): { 'group': 'select' }, f'{key}.select': { 'help': f'{key.capitalize()}(s) to process; otherwise all not ignored', 'metavar': 'NAME', 'default': [], 'argparse': { 'alias': [f'--{key}'], }, 'postprocess': postprocess_selections }, f'{key}.select-all': { 'help': f'Select all {key}s', 'default': False, }, key + '.ignore': { 'help': f'{key.capitalize()}(s) to ignore', 'metavar': 'NAME', 'default': [], }, f'{key}.ignore-all': { 'help': f'Ignore all {key}s unless explicitly selected', 'default': False, }, } SECTION_CONFIG = select_and_ignore_config_desc('section') SYMBOL_CONFIG = select_and_ignore_config_desc('symbol') REGION_CONFIG = select_and_ignore_config_desc('region') CONFIG: ConfigDescription = { Config.group_def('select'): { 'title': 'selection options', }, **SECTION_CONFIG, **SYMBOL_CONFIG, **REGION_CONFIG, } COLLECTED_CHOICES = ['symbol', 'section'] SYNTHETIC_CHOICES = ['region'] SELECTION_CHOICES = COLLECTED_CHOICES + SYNTHETIC_CHOICES def is_selected(config: Config, column, name) -> bool: """Test `name` against the configured selection criteria for `column`.""" if config.getl([column, 'select-all']): return True if name in config.getl([column, 'select'], []): return True return False def synthesize_region(config: Config, df: DF, column: str) -> DF: """Add a 'region' column derived from the 'section' column.""" cmap = config.transpose_dictlist(config.get('region.sections', {})) memdf.util.pretty.debug(cmap) df[column] = df['section'].map(lambda x: cmap.get(x, memdf.name.UNKNOWN)) return df def groupby_region(df: DF): return df[(df['size'] > 0) | (df['region'] != memdf.name.UNKNOWN)] SYNTHESIZE = { 'region': (synthesize_region, groupby_region), } def synthesize_column(config: Config, df: DF, column: str) -> DF: if column not in df.columns: SYNTHESIZE[column][0](config, df, column) return df def select_configured_column(config: Config, df: DF, column: str) -> DF: """Apply configured selection options to a column""" if column in df and not config.getl([column, 'select-all']): selections = config.getl([column, 'select'], []) if selections: df = df.loc[df[column].isin(selections)] return df def select_configured(config: Config, df: DF, columns=SELECTION_CHOICES) -> DF: for column in columns: df = select_configured_column(config, df, column) return df def groupby(config: Config, df: DF, by: Optional[str] = None): if not by: by = config['report.by'] df = df[[by, 'size']].groupby(by).aggregate(np.sum).reset_index() if by in SYNTHESIZE: df = SYNTHESIZE[by][1](df) return df