Env.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. """ Test Env, manages DUT, App and EnvConfig, interface for test cases to access these components """
  4. import functools
  5. import os
  6. import threading
  7. import traceback
  8. import netifaces
  9. from . import EnvConfig
  10. def _synced(func):
  11. @functools.wraps(func)
  12. def decorator(self, *args, **kwargs):
  13. with self.lock:
  14. ret = func(self, *args, **kwargs)
  15. return ret
  16. decorator.__doc__ = func.__doc__
  17. return decorator
  18. class Env(object):
  19. """
  20. test env, manages DUTs and env configs.
  21. :keyword app: class for default application
  22. :keyword dut: class for default DUT
  23. :keyword env_tag: test env tag, used to select configs from env config file
  24. :keyword env_config_file: test env config file path
  25. :keyword test_name: test suite name, used when generate log folder name
  26. """
  27. CURRENT_LOG_FOLDER = ''
  28. def __init__(self,
  29. app=None,
  30. dut=None,
  31. env_tag=None,
  32. env_config_file=None,
  33. test_suite_name=None,
  34. **kwargs):
  35. self.app_cls = app
  36. self.default_dut_cls = dut
  37. self.config = EnvConfig.Config(env_config_file, env_tag)
  38. self.log_path = self.app_cls.get_log_folder(test_suite_name)
  39. if not os.path.exists(self.log_path):
  40. os.makedirs(self.log_path)
  41. Env.CURRENT_LOG_FOLDER = self.log_path
  42. self.allocated_duts = dict()
  43. self.lock = threading.RLock()
  44. @_synced
  45. def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, app_config_name=None, **dut_init_args):
  46. """
  47. get_dut(dut_name, app_path, dut_class=None, app_class=None)
  48. :param dut_name: user defined name for DUT
  49. :param app_path: application path, app instance will use this path to process application info
  50. :param dut_class: dut class, if not specified will use default dut class of env
  51. :param app_class: app class, if not specified will use default app of env
  52. :param app_config_name: app build config
  53. :keyword dut_init_args: extra kwargs used when creating DUT instance
  54. :return: dut instance
  55. """
  56. if dut_name in self.allocated_duts:
  57. dut = self.allocated_duts[dut_name]['dut']
  58. else:
  59. if dut_class is None:
  60. dut_class = self.default_dut_cls
  61. if app_class is None:
  62. app_class = self.app_cls
  63. app_target = dut_class.TARGET
  64. detected_target = None
  65. try:
  66. port = self.config.get_variable(dut_name)
  67. if not app_target:
  68. result, detected_target = dut_class.confirm_dut(port)
  69. except ValueError:
  70. # try to auto detect ports
  71. allocated_ports = [self.allocated_duts[x]['port'] for x in self.allocated_duts]
  72. available_ports = dut_class.list_available_ports()
  73. for port in available_ports:
  74. if port not in allocated_ports:
  75. result, detected_target = dut_class.confirm_dut(port)
  76. if result:
  77. break
  78. else:
  79. port = None
  80. if not app_target:
  81. app_target = detected_target
  82. if not app_target:
  83. raise ValueError("DUT class doesn't specify the target, and autodetection failed")
  84. app_inst = app_class(app_path, app_config_name, app_target)
  85. if port:
  86. try:
  87. dut_config = self.get_variable(dut_name + '_port_config')
  88. except ValueError:
  89. dut_config = dict()
  90. dut_config.update(dut_init_args)
  91. dut = dut_class(dut_name, port,
  92. os.path.join(self.log_path, dut_name + '.log'),
  93. app_inst,
  94. **dut_config)
  95. self.allocated_duts[dut_name] = {'port': port, 'dut': dut}
  96. else:
  97. raise ValueError('Failed to get DUT')
  98. return dut
  99. @_synced
  100. def close_dut(self, dut_name):
  101. """
  102. close_dut(dut_name)
  103. close one DUT by name if DUT name is valid (the name used by ``get_dut``). otherwise will do nothing.
  104. :param dut_name: user defined name for DUT
  105. :return: None
  106. """
  107. try:
  108. dut = self.allocated_duts.pop(dut_name)['dut']
  109. dut.close()
  110. except KeyError:
  111. pass
  112. @_synced
  113. def get_variable(self, variable_name):
  114. """
  115. get_variable(variable_name)
  116. get variable from config file. If failed then try to auto-detected it.
  117. :param variable_name: name of the variable
  118. :return: value of variable if successfully found. otherwise None.
  119. """
  120. return self.config.get_variable(variable_name)
  121. PROTO_MAP = {
  122. 'ipv4': netifaces.AF_INET,
  123. 'ipv6': netifaces.AF_INET6,
  124. 'mac': netifaces.AF_LINK,
  125. }
  126. @_synced
  127. def get_pc_nic_info(self, nic_name='pc_nic', proto='ipv4'):
  128. """
  129. get_pc_nic_info(nic_name="pc_nic")
  130. try to get info of a specified NIC and protocol.
  131. :param nic_name: pc nic name. allows passing variable name, nic name value.
  132. :param proto: "ipv4", "ipv6" or "mac"
  133. :return: a dict of nic info if successfully found. otherwise None.
  134. nic info keys could be different for different protocols.
  135. key "addr" is available for both mac, ipv4 and ipv6 pic info.
  136. """
  137. interfaces = netifaces.interfaces()
  138. if nic_name in interfaces:
  139. # the name is in the interface list, we regard it as NIC name
  140. if_addr = netifaces.ifaddresses(nic_name)
  141. else:
  142. # it's not in interface name list, we assume it's variable name
  143. _nic_name = self.get_variable(nic_name)
  144. if_addr = netifaces.ifaddresses(_nic_name)
  145. return if_addr[self.PROTO_MAP[proto]][0]
  146. @_synced
  147. def close(self, dut_debug=False):
  148. """
  149. close()
  150. close all DUTs of the Env.
  151. :param dut_debug: if dut_debug is True, then print all dut expect failures before close it
  152. :return: exceptions during close DUT
  153. """
  154. dut_close_errors = []
  155. for dut_name in self.allocated_duts:
  156. dut = self.allocated_duts[dut_name]['dut']
  157. try:
  158. if dut_debug:
  159. dut.print_debug_info()
  160. dut.close()
  161. except Exception as e:
  162. traceback.print_exc()
  163. dut_close_errors.append(e)
  164. self.allocated_duts = dict()
  165. return dut_close_errors