ot_ci_function.py 7.2 KB


  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Unlicense OR CC0-1.0
  3. # !/usr/bin/env python3
  4. # this file defines some functions for testing cli and br under pytest framework
  5. import re
  6. import subprocess
  7. import time
  8. from typing import Tuple, Union
  9. import netifaces
  10. import pexpect
  11. from pytest_embedded_idf.dut import IdfDut
  12. def reset_thread(dut:IdfDut) -> None:
  13. time.sleep(1)
  14. dut.write('factoryreset')
  15. time.sleep(3)
  16. dut.expect('OpenThread attached to netif', timeout=10)
  17. dut.write(' ')
  18. dut.write('state')
  19. # config thread
  20. def config_thread(dut:IdfDut, model:str, dataset:str='0') -> Union[str, None]:
  21. if model == 'random':
  22. dut.write('dataset init new')
  23. dut.expect('Done', timeout=2)
  24. dut.write('dataset commit active')
  25. dut.expect('Done', timeout=2)
  26. dut.write('ifconfig up')
  27. dut.expect('Done', timeout=2)
  28. dut.write('dataset active -x') # get dataset
  29. dut_data = dut.expect(r'\n(\w{212})\r', timeout=5)[1].decode()
  30. return str(dut_data)
  31. if model == 'appointed':
  32. tmp = 'dataset set active ' + str(dataset)
  33. dut.write(tmp)
  34. dut.expect('Done', timeout=2)
  35. dut.write('ifconfig up')
  36. dut.expect('Done', timeout=2)
  37. return None
  38. return None
  39. # get the mleid address of the thread
  40. def get_mleid_addr(dut:IdfDut) -> str:
  41. dut_adress = ''
  42. clean_buffer(dut)
  43. dut.write('ipaddr mleid')
  44. dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
  45. return dut_adress
  46. # get the rloc address of the thread
  47. def get_rloc_addr(dut:IdfDut) -> str:
  48. dut_adress = ''
  49. clean_buffer(dut)
  50. dut.write('ipaddr rloc')
  51. dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
  52. return dut_adress
  53. # get the linklocal address of the thread
  54. def get_linklocal_addr(dut:IdfDut) -> str:
  55. dut_adress = ''
  56. clean_buffer(dut)
  57. dut.write('ipaddr linklocal')
  58. dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
  59. return dut_adress
  60. # get the global unicast address of the thread:
  61. def get_global_unicast_addr(dut:IdfDut, br:IdfDut) -> str:
  62. dut_adress = ''
  63. clean_buffer(br)
  64. br.write('br omrprefix')
  65. omrprefix = br.expect(r'\n((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
  66. clean_buffer(dut)
  67. dut.write('ipaddr')
  68. dut_adress = dut.expect(r'(%s(?:\w+:){3}\w+)\r' % str(omrprefix), timeout=5)[1].decode()
  69. return dut_adress
  70. # start thread
  71. def start_thread(dut:IdfDut) -> str:
  72. role = ''
  73. dut.write('thread start')
  74. tmp = dut.expect(r'Role detached -> (\w+)\W', timeout=20)[0]
  75. role = re.findall(r'Role detached -> (\w+)\W', str(tmp))[0]
  76. return role
  77. # config br and cli manually
  78. def form_network_using_manual_configuration(leader:IdfDut, child:IdfDut, leader_name:str, thread_dataset_model:str,
  79. thread_dataset:str, wifi:IdfDut, wifi_ssid:str, wifi_psk:str) -> str:
  80. time.sleep(3)
  81. leader.expect('OpenThread attached to netif', timeout=10)
  82. leader.write(' ')
  83. leader.write('state')
  84. child.expect('OpenThread attached to netif', timeout=10)
  85. child.write(' ')
  86. child.write('state')
  87. reset_thread(leader)
  88. reset_thread(child)
  89. leader.write('channel 12')
  90. leader.expect('Done', timeout=2)
  91. child.write('channel 12')
  92. child.expect('Done', timeout=2)
  93. res = '0000'
  94. if wifi_psk != '0000':
  95. res = connect_wifi(wifi, wifi_ssid, wifi_psk, 10)[0]
  96. leader_data = ''
  97. if thread_dataset_model == 'random':
  98. leader_data = str(config_thread(leader, 'random'))
  99. else:
  100. config_thread(leader, 'appointed', thread_dataset)
  101. if leader_name == 'br':
  102. leader.write('bbr enable')
  103. leader.expect('Done', timeout=2)
  104. role = start_thread(leader)
  105. assert role == 'leader'
  106. if thread_dataset_model == 'random':
  107. config_thread(child, 'appointed', leader_data)
  108. else:
  109. config_thread(child, 'appointed', thread_dataset)
  110. if leader_name != 'br':
  111. child.write('bbr enable')
  112. child.expect('Done', timeout=2)
  113. role = start_thread(child)
  114. assert role == 'child'
  115. return res
  116. # ping of thread
  117. def ot_ping(dut:IdfDut, target:str, times:int) -> Tuple[int, int]:
  118. command = 'ping ' + str(target) + ' 0 ' + str(times)
  119. dut.write(command)
  120. transmitted = dut.expect(r'(\d+) packets transmitted', timeout=30)[1].decode()
  121. tx_count = int(transmitted)
  122. received = dut.expect(r'(\d+) packets received', timeout=30)[1].decode()
  123. rx_count = int(received)
  124. return tx_count, rx_count
  125. # connect Wi-Fi
  126. def connect_wifi(dut:IdfDut, ssid:str, psk:str, nums:int) -> Tuple[str, int]:
  127. clean_buffer(dut)
  128. ip_address = ''
  129. information = ''
  130. for order in range(1, nums):
  131. dut.write('wifi connect -s ' + str(ssid) + ' -p ' + str(psk))
  132. tmp = dut.expect(pexpect.TIMEOUT, timeout=5)
  133. ip_address = re.findall(r'sta ip: (\w+.\w+.\w+.\w+),', str(tmp))[0]
  134. information = dut.expect(r'wifi sta (\w+ \w+ \w+)\W', timeout=5)[1].decode()
  135. if information == 'is connected successfully':
  136. break
  137. assert information == 'is connected successfully'
  138. return ip_address, order
  139. def reset_host_interface() -> None:
  140. interface_name = get_host_interface_name()
  141. flag = False
  142. try:
  143. command = 'ifconfig ' + interface_name + ' down'
  144. subprocess.call(command, shell=True, timeout=5)
  145. time.sleep(10)
  146. command = 'ifconfig ' + interface_name + ' up'
  147. subprocess.call(command, shell=True, timeout=10)
  148. time.sleep(20)
  149. flag = True
  150. finally:
  151. time.sleep(10)
  152. assert flag
  153. def set_interface_sysctl_options() -> None:
  154. interface_name = get_host_interface_name()
  155. flag = False
  156. try:
  157. command = 'sysctl -w net/ipv6/conf/' + interface_name + '/accept_ra=2'
  158. subprocess.call(command, shell=True, timeout=5)
  159. time.sleep(1)
  160. command = 'sysctl -w net/ipv6/conf/' + interface_name + '/accept_ra_rt_info_max_plen=128'
  161. subprocess.call(command, shell=True, timeout=5)
  162. time.sleep(5)
  163. flag = True
  164. finally:
  165. time.sleep(5)
  166. assert flag
  167. def init_interface_ipv6_address() -> None:
  168. interface_name = get_host_interface_name()
  169. flag = False
  170. try:
  171. command = 'ip -6 route | grep ' + interface_name + " | grep ra | awk {'print $1'} | xargs -I {} ip -6 route del {}"
  172. subprocess.call(command, shell=True, timeout=5)
  173. time.sleep(0.5)
  174. subprocess.call(command, shell=True, timeout=5)
  175. time.sleep(1)
  176. command = 'ip -6 address show dev ' + interface_name + \
  177. " scope global | grep 'inet6' | awk {'print $2'} | xargs -I {} ip -6 addr del {} dev " + interface_name
  178. subprocess.call(command, shell=True, timeout=5)
  179. time.sleep(1)
  180. flag = True
  181. finally:
  182. time.sleep(5)
  183. assert flag
  184. def get_host_interface_name() -> str:
  185. interfaces = netifaces.interfaces()
  186. interface_name = [s for s in interfaces if 'wl' in s][0]
  187. return str(interface_name)
  188. def clean_buffer(dut:IdfDut) -> None:
  189. str_length = str(len(dut.expect(pexpect.TIMEOUT, timeout=0.1)))
  190. dut.expect(r'[\s\S]{%s}' % str(str_length), timeout=10)