common_test_methods.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import os
  5. import socket
  6. import netifaces
  7. import yaml
  8. ENV_CONFIG_FILE_SEARCH = [
  9. os.path.join(os.environ['IDF_PATH'], 'EnvConfig.yml'),
  10. os.path.join(os.environ['IDF_PATH'], 'ci-test-runner-configs', os.environ.get('CI_RUNNER_DESCRIPTION', ''), 'EnvConfig.yml'),
  11. ]
  12. def get_my_ip_by_interface(interface_name: str, ip_type: int = netifaces.AF_INET) -> str:
  13. for i in netifaces.ifaddresses(interface_name)[ip_type]:
  14. interface_name = i['addr'].replace('%{}'.format(interface_name), '')
  15. assert isinstance(interface_name, str)
  16. return interface_name
  17. return ''
  18. def get_my_ip4_by_getway(getway: str = '') -> str:
  19. getways = netifaces.gateways()
  20. for gw, iface_name, _ in getways[netifaces.AF_INET]:
  21. if gw and gw == getway:
  22. interface = iface_name
  23. break
  24. else:
  25. interface = getways['default'][netifaces.AF_INET][1]
  26. logging.debug('Using interface: {}.'.format(interface))
  27. address = netifaces.ifaddresses(interface)[netifaces.AF_INET]
  28. assert isinstance(address[0]['addr'], str)
  29. return address[0]['addr']
  30. def get_my_ip4_by_dest_ip(dest_ip: str = '') -> str:
  31. if not dest_ip:
  32. dest_ip = '8.8.8.8'
  33. s1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  34. s1.connect((dest_ip, 80))
  35. my_ip = s1.getsockname()[0]
  36. s1.close()
  37. assert isinstance(my_ip, str)
  38. return my_ip
  39. def get_my_interface_by_dest_ip(dest_ip: str = '') -> str:
  40. my_ip = get_my_ip4_by_dest_ip(dest_ip)
  41. interfaces = netifaces.interfaces()
  42. for interface in interfaces:
  43. try:
  44. addresses = netifaces.ifaddresses(interface)[netifaces.AF_INET]
  45. except KeyError:
  46. continue
  47. if my_ip in [a['addr'] for a in addresses]:
  48. assert isinstance(interface, str)
  49. return interface
  50. logging.error('Can not get interface, dest ip is {}'.format(dest_ip))
  51. return ''
  52. def get_env_config(env_key: str = '', config_file: str = '') -> dict:
  53. if not config_file:
  54. for _file in ENV_CONFIG_FILE_SEARCH:
  55. if os.path.exists(_file):
  56. config_file = _file
  57. if not config_file:
  58. return dict()
  59. with open(config_file, 'r') as f:
  60. config = yaml.load(f.read(), Loader=yaml.SafeLoader)
  61. assert isinstance(config, dict)
  62. if not env_key:
  63. return config
  64. if env_key in config:
  65. _config = config[env_key]
  66. assert isinstance(_config, dict)
  67. return _config
  68. logging.warning('Can not get env config, key: {}'.format(env_key))
  69. return dict()