common_test_methods.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import os
  5. import socket
  6. from typing import Any, List
  7. import netifaces
  8. import yaml
  9. ENV_CONFIG_FILE_SEARCH = [
  10. os.path.join(os.environ['IDF_PATH'], 'EnvConfig.yml'),
  11. os.path.join(os.environ['IDF_PATH'], 'ci-test-runner-configs', os.environ.get('CI_RUNNER_DESCRIPTION', ''), 'EnvConfig.yml'),
  12. ]
  13. ENV_CONFIG_TEMPLATE = '''
  14. $IDF_PATH/EnvConfig.yml:
  15. ```yaml
  16. <env_name>:
  17. key: var
  18. key2: var2
  19. <another_env>:
  20. key: var
  21. ```
  22. '''
  23. def get_host_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str:
  24. if ip_type == netifaces.AF_INET:
  25. for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
  26. host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
  27. assert isinstance(host_ip, str)
  28. return host_ip
  29. elif ip_type == netifaces.AF_INET6:
  30. ip6_addrs: List[str] = []
  31. for _addr in netifaces.ifaddresses(interface_name)[ip_type]:
  32. host_ip = _addr['addr'].replace('%{}'.format(interface_name), '')
  33. assert isinstance(host_ip, str)
  34. # prefer to use link local address due to example settings
  35. if host_ip.startswith('FE80::'):
  36. ip6_addrs.insert(0, host_ip)
  37. else:
  38. ip6_addrs.append(host_ip)
  39. if ip6_addrs:
  40. return ip6_addrs[0]
  41. return ''
  42. def get_host_ip4_by_dest_ip(dest_ip: str = '') -> str:
  43. if not dest_ip:
  44. dest_ip = '8.8.8.8'
  45. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  46. s1.connect((dest_ip, 80))
  47. host_ip = s1.getsockname()[0]
  48. s1.close()
  49. assert isinstance(host_ip, str)
  50. print(f'Using host ip: {host_ip}')
  51. return host_ip
  52. def get_host_ip6_by_dest_ip(dest_ip: str, interface: str) -> str:
  53. addr_info = socket.getaddrinfo(f'{dest_ip}%{interface}', 80, socket.AF_INET6, socket.SOCK_DGRAM)
  54. s1 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
  55. s1.connect(addr_info[0][-1])
  56. host_ip = s1.getsockname()[0]
  57. s1.close()
  58. assert isinstance(host_ip, str)
  59. print(f'Using host ip: {host_ip}')
  60. return host_ip
  61. def get_my_interface_by_dest_ip(dest_ip: str = '') -> str:
  62. my_ip = get_host_ip4_by_dest_ip(dest_ip)
  63. interfaces = netifaces.interfaces()
  64. for interface in interfaces:
  65. try:
  66. addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET]
  67. except KeyError:
  68. continue
  69. if my_ip in [a['addr'] for a in addresses]:
  70. assert isinstance(interface, str)
  71. return interface
  72. logging.error('Can not get interface, dest ip is {}'.format(dest_ip))
  73. return ''
  74. def get_env_config_variable(env_name: str, key: str, default: Any = None) -> Any:
  75. """
  76. Get test environment related variable
  77. config file format: $IDF_PATH/EnvConfig.yml
  78. ```
  79. <env_name>:
  80. key: var
  81. key2: var2
  82. <env_name2>:
  83. key: var
  84. ```
  85. """
  86. config = dict()
  87. for _file in ENV_CONFIG_FILE_SEARCH:
  88. try:
  89. with open(_file, 'r') as f:
  90. data = yaml.load(f.read(), Loader=yaml.SafeLoader)
  91. config = data[env_name]
  92. break
  93. except (FileNotFoundError, KeyError):
  94. pass
  95. else:
  96. pass
  97. var = config.get(key, default)
  98. if var is None:
  99. logging.warning(f'Failed to get env variable {env_name}/{key}.')
  100. logging.info(f'Env config file template: {ENV_CONFIG_TEMPLATE}')
  101. if not os.environ.get('CI_JOB_ID'):
  102. # Try to get variable from stdin
  103. var = input(f'You can input the variable now:')
  104. else:
  105. raise ValueError(f'Env variable not found: {env_name}/{key}')
  106. logging.debug(f'Got env variable {env_name}/{key}: {var}')
  107. return var