| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- #
- # Copyright (c) 2021 Project CHIP Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- """Data frame selection utilities."""
- from typing import Mapping, Optional
- import memdf.name
- import memdf.util.config
- import memdf.util.pretty
- import numpy as np # type: ignore
- from memdf import DF, Config, ConfigDescription
- def split_size(config: Config, key: str) -> None:
- """Split a name:size configuration value.
- When a program supports a size threshold for selection or summary,
- this can be specificed for a particular item with a suffix on the
- configuration, e.g. `--section=.text:16K`.
- Given a configuration key `col.select` referring to such a list of
- arguments, this function strips any sizes from those arguments
- and stores them as a name:size dictionary in `col.limit`.
- """
- src = key.split('.')
- dst = src[:-1] + ['limit']
- splits = [s.split(':') for s in config.getl(src, [])]
- config.putl(src, [x[0] for x in splits])
- config.putl(dst, {
- x[0]: memdf.util.config.parse_size(x[1])
- for x in splits if len(x) > 1
- })
- def get_limit(config: Config, column: str, name: str) -> int:
- return config.getl([column, 'limit', name], config.get('report.limit', 0))
- def postprocess_selections(config: Config, key: str, info: Mapping) -> None:
- """Resolve select/ignore command options."""
- split_size(config, key)
- choice, select = key.split('.')
- assert select == 'select'
- selections = config.get(key)
- if not config.getl([choice, 'ignore-all'], False):
- if defaults := config.getl([choice, 'default']):
- for i in config.getl([choice, 'ignore']):
- if i in defaults:
- defaults.remove(i)
- selections += defaults
- config.put(key, frozenset(selections))
- def select_and_ignore_config_desc(key: str) -> ConfigDescription:
- return {
- Config.group_map(key): {
- 'group': 'select'
- },
- f'{key}.select': {
- 'help':
- f'{key.capitalize()}(s) to process; otherwise all not ignored',
- 'metavar': 'NAME',
- 'default': [],
- 'argparse': {
- 'alias': [f'--{key}'],
- },
- 'postprocess': postprocess_selections
- },
- f'{key}.select-all': {
- 'help': f'Select all {key}s',
- 'default': False,
- },
- key + '.ignore': {
- 'help': f'{key.capitalize()}(s) to ignore',
- 'metavar': 'NAME',
- 'default': [],
- },
- f'{key}.ignore-all': {
- 'help': f'Ignore all {key}s unless explicitly selected',
- 'default': False,
- },
- }
- SECTION_CONFIG = select_and_ignore_config_desc('section')
- SYMBOL_CONFIG = select_and_ignore_config_desc('symbol')
- REGION_CONFIG = select_and_ignore_config_desc('region')
- CONFIG: ConfigDescription = {
- Config.group_def('select'): {
- 'title': 'selection options',
- },
- **SECTION_CONFIG,
- **SYMBOL_CONFIG,
- **REGION_CONFIG,
- }
- COLLECTED_CHOICES = ['symbol', 'section']
- SYNTHETIC_CHOICES = ['region']
- SELECTION_CHOICES = COLLECTED_CHOICES + SYNTHETIC_CHOICES
- def is_selected(config: Config, column, name) -> bool:
- """Test `name` against the configured selection criteria for `column`."""
- if config.getl([column, 'select-all']):
- return True
- if name in config.getl([column, 'select'], []):
- return True
- return False
- def synthesize_region(config: Config, df: DF, column: str) -> DF:
- """Add a 'region' column derived from the 'section' column."""
- cmap = config.transpose_dictlist(config.get('region.sections', {}))
- memdf.util.pretty.debug(cmap)
- df[column] = df['section'].map(lambda x: cmap.get(x, memdf.name.UNKNOWN))
- return df
- def groupby_region(df: DF):
- return df[(df['size'] > 0) | (df['region'] != memdf.name.UNKNOWN)]
- SYNTHESIZE = {
- 'region': (synthesize_region, groupby_region),
- }
- def synthesize_column(config: Config, df: DF, column: str) -> DF:
- if column not in df.columns:
- SYNTHESIZE[column][0](config, df, column)
- return df
- def select_configured_column(config: Config, df: DF, column: str) -> DF:
- """Apply configured selection options to a column"""
- if column in df and not config.getl([column, 'select-all']):
- selections = config.getl([column, 'select'], [])
- if selections:
- df = df.loc[df[column].isin(selections)]
- return df
- def select_configured(config: Config, df: DF, columns=SELECTION_CHOICES) -> DF:
- for column in columns:
- df = select_configured_column(config, df, column)
- return df
- def groupby(config: Config, df: DF, by: Optional[str] = None):
- if not by:
- by = config['report.by']
- df = df[[by, 'size']].groupby(by).aggregate(np.sum).reset_index()
- if by in SYNTHESIZE:
- df = SYNTHESIZE[by][1](df)
- return df
|