| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- #!/usr/bin/env -S python3 -B
- # Copyright (c) 2023 Project CHIP Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import relative_importer # isort: split # noqa: F401
- import asyncio
- import importlib
- import os
- import sys
- import traceback
- from dataclasses import dataclass
- import click
- from matter_yamltests.definitions import SpecDefinitionsFromPaths
- from matter_yamltests.parser import TestParserConfig
- from matter_yamltests.parser_builder import TestParserBuilderConfig
- from matter_yamltests.parser_config import TestConfigParser
- from matter_yamltests.pseudo_clusters.pseudo_clusters import PseudoClusters, get_default_pseudo_clusters
- from matter_yamltests.runner import TestRunner, TestRunnerConfig, TestRunnerOptions
- from matter_yamltests.websocket_runner import WebSocketRunner, WebSocketRunnerConfig
- from paths_finder import PathsFinder
- from tests_finder import TestsFinder
- from tests_logger import TestParserLogger, TestRunnerLogger, WebSocketRunnerLogger
- #
- # Options
- #
- _DEFAULT_CONFIG_NAME = TestsFinder.get_default_configuration_name()
- _DEFAULT_CONFIG_DIR = TestsFinder.get_default_configuration_directory()
- _DEFAULT_SPECIFICATIONS_DIR = 'src/app/zap-templates/zcl/data-model/chip/*.xml'
- _DEFAULT_PICS_FILE = 'src/app/tests/suites/certification/ci-pics-values'
- def get_custom_pseudo_clusters(additional_pseudo_clusters_directory: str):
- clusters = get_default_pseudo_clusters()
- if additional_pseudo_clusters_directory:
- sys.path.insert(0, additional_pseudo_clusters_directory)
- for filepath in os.listdir(additional_pseudo_clusters_directory):
- if filepath != '__init__.py' and filepath[-3:] == '.py':
- module = importlib.import_module(f'{filepath[:-3]}')
- constructor = getattr(module, module.__name__)
- if constructor:
- clusters.add(constructor())
- return clusters
- def test_parser_options(f):
- f = click.option('--configuration_name', type=str, show_default=True, default=_DEFAULT_CONFIG_NAME,
- help='Name of the collection configuration json file to use.')(f)
- f = click.option('--configuration_directory', type=click.Path(exists=True), show_default=True, default=_DEFAULT_CONFIG_DIR,
- help='Path to the directory containing the tests configuration.')(f)
- f = click.option('--specifications_paths', type=click.Path(), show_default=True, default=_DEFAULT_SPECIFICATIONS_DIR,
- help='Path to a set of files containing clusters definitions.')(f)
- f = click.option('--PICS', type=click.Path(exists=True), show_default=True, default=_DEFAULT_PICS_FILE,
- help='Path to the PICS file to use.')(f)
- f = click.option('--stop_on_error', type=bool, show_default=True, default=True,
- help='Stop parsing on first error.')(f)
- f = click.option('--use_default_pseudo_clusters', type=bool, show_default=True, default=True,
- help='If enable this option use the set of default clusters provided by the matter_yamltests package.')(f)
- f = click.option('--additional_pseudo_clusters_directory', type=click.Path(), show_default=True, default=None,
- help='Path to a directory containing additional pseudo clusters.')(f)
- return f
- def test_runner_options(f):
- f = click.option('--adapter', type=str, default=None, required=True, show_default=True,
- help='The adapter to run the test with.')(f)
- f = click.option('--stop_on_error', type=bool, default=True, show_default=True,
- help='Stop the test suite on first error.')(f)
- f = click.option('--stop_on_warning', type=bool, default=False, show_default=True,
- help='Stop the test suite on first warning.')(f)
- f = click.option('--stop_at_number', type=int, default=-1, show_default=True,
- help='Stop the the test suite at the specified test number.')(f)
- f = click.option('--show_adapter_logs', type=bool, default=False, show_default=True,
- help='Show additional logs provided by the adapter.')(f)
- f = click.option('--show_adapter_logs_on_error', type=bool, default=True, show_default=True,
- help='Show additional logs provided by the adapter on error.')(f)
- f = click.option('--use_test_harness_log_format', type=bool, default=False, show_default=True,
- help='Use the test harness log format.')(f)
- f = click.option('--delay-in-ms', type=int, default=0, show_default=True,
- help='Add a delay between test suite steps.')(f)
- return f
- def websocket_runner_options(f):
- f = click.option('--server_address', type=str, default='localhost', show_default=True,
- help='The websocket server address to connect to.')(f)
- f = click.option('--server_port', type=int, default=9002, show_default=True,
- help='The websocket server port to connect to.')(f)
- f = click.option('--server_name', type=str, default=None,
- help='Name of a websocket server to run at launch.')(f)
- f = click.option('--server_path', type=click.Path(exists=True), default=None,
- help='Path to a websocket server to run at launch.')(f)
- f = click.option('--server_arguments', type=str, default=None,
- help='Optional arguments to pass to the websocket server at launch.')(f)
- return f
- def chip_repl_runner_options(f):
- f = click.option('--repl_storage_path', type=str, default='/tmp/repl-storage.json',
- help='Path to persistent storage configuration file.')(f)
- f = click.option('--commission_on_network_dut', type=bool, default=False,
- help='Prior to running test should we try to commission DUT on network.')(f)
- f = click.option('--runner', type=str, default=None, show_default=True,
- help='The runner to run the test with.')(f)
- return f
- @dataclass
- class ParserGroup:
- builder_config: TestParserBuilderConfig
- pseudo_clusters: PseudoClusters
- pass_parser_group = click.make_pass_decorator(ParserGroup)
- # YAML test file contains configurable options defined in their config section.
- #
- # Those options are test specific and as such can not be listed directly here but
- # instead are retrieved from the target test file (if there is a single file) and
- # are exposed to click dynamically.
- #
- # The following code extracts those and, to make it easy for the user to see
- # which options are available, list them in a custom section when --help
- # is invoked.
- class YamlTestParserGroup(click.Group):
- def format_options(self, ctx, formatter):
- """Writes all the options into the formatter if they exist."""
- if ctx.custom_options:
- params_copy = self.params
- non_custom_params = list(filter(lambda x: x.name not in ctx.custom_options, self.params))
- custom_params = list(filter(lambda x: x.name in ctx.custom_options, self.params))
- self.params = non_custom_params
- super().format_options(ctx, formatter)
- self.params = params_copy
- opts = []
- for param in custom_params:
- rv = param.get_help_record(ctx)
- if rv is not None:
- opts.append(rv)
- if opts:
- custom_section_title = ctx.params.get('test_name') + ' Options'
- with formatter.section(custom_section_title):
- formatter.write_dl(opts)
- else:
- super().format_options(ctx, formatter)
- def parse_args(self, ctx, args):
- # Run the parser on the supported arguments first in order to get a
- # the necessary informations to get read the test file and add
- # the potential additional arguments.
- supported_args = self.__remove_custom_args(ctx, args)
- super().parse_args(ctx, supported_args)
- # Add the potential new arguments to the list of supported params and
- # run the parser a new time to read those.
- self.__add_custom_params(ctx)
- return super().parse_args(ctx, args)
- def __remove_custom_args(self, ctx, args):
- # Remove all the unsupported options from the command line string.
- params_name = [param.name for param in self.params]
- supported_args = []
- skipArgument = False
- for arg in args:
- if arg.startswith('--') and arg not in params_name:
- skipArgument = True
- continue
- if skipArgument:
- skipArgument = False
- continue
- supported_args.append(arg)
- return supported_args
- def __add_custom_params(self, ctx):
- tests_finder = TestsFinder(ctx.params.get('configuration_directory'), ctx.params.get('configuration_name'))
- tests = tests_finder.get(ctx.params.get('test_name'))
- custom_options = {}
- # There is a single test, extract the custom config
- if len(tests) == 1:
- try:
- custom_options = TestConfigParser.get_config(tests[0])
- except Exception:
- pass
- for key, value in custom_options.items():
- param = click.Option(['--' + key], default=value, show_default=True)
- # click converts parameter name to lowercase internally, so we need to override
- # this behavior in order to override the correct key.
- param.name = key
- self.params.append(param)
- ctx.custom_options = custom_options
- CONTEXT_SETTINGS = dict(
- default_map={
- 'chiptool': {
- 'adapter': 'matter_chip_tool_adapter.adapter',
- 'server_name': 'chip-tool',
- 'server_arguments': 'interactive server',
- },
- 'darwinframeworktool': {
- 'adapter': 'matter_chip_tool_adapter.adapter',
- 'server_name': 'darwin-framework-tool',
- 'server_arguments': 'interactive server',
- },
- 'app1': {
- 'configuration_directory': 'examples/placeholder/linux/apps/app1',
- 'adapter': 'matter_placeholder_adapter.adapter',
- 'server_name': 'chip-app1',
- 'server_arguments': '--interactive',
- },
- 'app2': {
- 'configuration_directory': 'examples/placeholder/linux/apps/app2',
- 'adapter': 'matter_placeholder_adapter.adapter',
- 'server_name': 'chip-app2',
- 'server_arguments': '--interactive',
- },
- 'chip-repl': {
- 'adapter': 'matter_yamltest_repl_adapter.adapter',
- 'runner': 'matter_yamltest_repl_adapter.runner',
- },
- },
- max_content_width=120,
- )
- @click.group(cls=YamlTestParserGroup, context_settings=CONTEXT_SETTINGS)
- @click.argument('test_name')
- @test_parser_options
- @click.pass_context
- def runner_base(ctx, configuration_directory: str, test_name: str, configuration_name: str, pics: str, specifications_paths: str, stop_on_error: bool, use_default_pseudo_clusters: bool, additional_pseudo_clusters_directory: str, **kwargs):
- pseudo_clusters = get_custom_pseudo_clusters(
- additional_pseudo_clusters_directory) if use_default_pseudo_clusters else PseudoClusters([])
- specifications = SpecDefinitionsFromPaths(specifications_paths.split(','), pseudo_clusters)
- tests_finder = TestsFinder(configuration_directory, configuration_name)
- test_list = tests_finder.get(test_name)
- if len(test_list) == 0:
- raise Exception(f"No tests found for test name '{test_name}'")
- parser_config = TestParserConfig(pics, specifications, kwargs)
- parser_builder_config = TestParserBuilderConfig(test_list, parser_config, hooks=TestParserLogger())
- parser_builder_config.options.stop_on_error = stop_on_error
- while ctx:
- ctx.obj = ParserGroup(parser_builder_config, pseudo_clusters)
- ctx = ctx.parent
- @runner_base.command()
- @pass_parser_group
- def parse(parser_group: ParserGroup):
- """Parse the test suite."""
- runner_config = None
- runner = TestRunner()
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(runner.run(parser_group.builder_config, runner_config))
- @runner_base.command()
- @pass_parser_group
- def dry_run(parser_group: ParserGroup):
- """Simulate a run of the test suite."""
- runner_config = TestRunnerConfig(hooks=TestRunnerLogger())
- runner = TestRunner()
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(runner.run(parser_group.builder_config, runner_config))
- @runner_base.command()
- @test_runner_options
- @pass_parser_group
- def run(parser_group: ParserGroup, adapter: str, stop_on_error: bool, stop_on_warning: bool, stop_at_number: int, show_adapter_logs: bool, show_adapter_logs_on_error: bool, use_test_harness_log_format: bool, delay_in_ms: int):
- """Run the test suite."""
- adapter = __import__(adapter, fromlist=[None]).Adapter(parser_group.builder_config.parser_config.definitions)
- runner_options = TestRunnerOptions(stop_on_error, stop_on_warning, stop_at_number, delay_in_ms)
- runner_hooks = TestRunnerLogger(show_adapter_logs, show_adapter_logs_on_error, use_test_harness_log_format)
- runner_config = TestRunnerConfig(adapter, parser_group.pseudo_clusters, runner_options, runner_hooks)
- runner = TestRunner()
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(runner.run(parser_group.builder_config, runner_config))
- @runner_base.command()
- @test_runner_options
- @websocket_runner_options
- @pass_parser_group
- def websocket(parser_group: ParserGroup, adapter: str, stop_on_error: bool, stop_on_warning: bool, stop_at_number: int, show_adapter_logs: bool, show_adapter_logs_on_error: bool, use_test_harness_log_format: bool, delay_in_ms: int, server_address: str, server_port: int, server_path: str, server_name: str, server_arguments: str):
- """Run the test suite using websockets."""
- adapter = __import__(adapter, fromlist=[None]).Adapter(parser_group.builder_config.parser_config.definitions)
- runner_options = TestRunnerOptions(stop_on_error, stop_on_warning, stop_at_number, delay_in_ms)
- runner_hooks = TestRunnerLogger(show_adapter_logs, show_adapter_logs_on_error, use_test_harness_log_format)
- runner_config = TestRunnerConfig(adapter, parser_group.pseudo_clusters, runner_options, runner_hooks)
- if server_path is None and server_name:
- paths_finder = PathsFinder()
- server_path = paths_finder.get(server_name)
- websocket_runner_hooks = WebSocketRunnerLogger()
- websocket_runner_config = WebSocketRunnerConfig(
- server_address, server_port, server_path, server_arguments, websocket_runner_hooks)
- runner = WebSocketRunner(websocket_runner_config)
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(runner.run(parser_group.builder_config, runner_config))
- @runner_base.command()
- @test_runner_options
- @chip_repl_runner_options
- @pass_parser_group
- def chip_repl(parser_group: ParserGroup, adapter: str, stop_on_error: bool, stop_on_warning: bool, stop_at_number: int, show_adapter_logs: bool, show_adapter_logs_on_error: bool, use_test_harness_log_format: bool, delay_in_ms: int, runner: str, repl_storage_path: str, commission_on_network_dut: bool):
- """Run the test suite using chip-repl."""
- adapter = __import__(adapter, fromlist=[None]).Adapter(parser_group.builder_config.parser_config.definitions)
- runner_options = TestRunnerOptions(stop_on_error, stop_on_warning, stop_at_number, delay_in_ms)
- runner_hooks = TestRunnerLogger(show_adapter_logs, show_adapter_logs_on_error, use_test_harness_log_format)
- runner_config = TestRunnerConfig(adapter, parser_group.pseudo_clusters, runner_options, runner_hooks)
- runner = __import__(runner, fromlist=[None]).Runner(repl_storage_path, commission_on_network_dut)
- loop = asyncio.get_event_loop()
- return loop.run_until_complete(runner.run(parser_group.builder_config, runner_config))
- @runner_base.command()
- @test_runner_options
- @websocket_runner_options
- @click.pass_context
- def chiptool(ctx, *args, **kwargs):
- """Run the test suite using chip-tool."""
- return ctx.forward(websocket)
- @runner_base.command()
- @test_runner_options
- @websocket_runner_options
- @click.pass_context
- def darwinframeworktool(ctx, *args, **kwargs):
- """Run the test suite using darwin-framework-tool."""
- return ctx.forward(websocket)
- @runner_base.command()
- @test_runner_options
- @websocket_runner_options
- @click.pass_context
- def app1(ctx, *args, **kwargs):
- """Run the test suite using app1."""
- return ctx.forward(websocket)
- @runner_base.command()
- @test_runner_options
- @websocket_runner_options
- @click.pass_context
- def app2(ctx, *args, **kwargs):
- """Run the test suite using app2."""
- return ctx.forward(websocket)
- if __name__ == '__main__':
- success = False
- try:
- # By default click runs in standalone mode and it will handle exceptions and the
- # different commands return values for us. For example it will set sys.exit to
- # 0 if the test runs fails unless an exception is raised. Simple test failure
- # does not raise exception but we want to set the exit code to 1.
- # So standalone_mode is set to False to let us manage this exit behavior.
- success = runner_base(standalone_mode=False)
- except Exception:
- print('')
- traceback.print_exc()
- sys.exit(0 if success else 1)
|