| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
- # SPDX-License-Identifier: Unlicense OR CC0-1.0
- # !/usr/bin/env python3
- # this file defines some functions for testing cli and br under pytest framework
- import re
- import socket
- import struct
- import subprocess
- import time
- from typing import Tuple, Union
- import netifaces
- import pexpect
- from pytest_embedded_idf.dut import IdfDut
- def reset_thread(dut:IdfDut) -> None:
- dut.write(' ')
- dut.write('state')
- clean_buffer(dut)
- wait(dut, 1)
- dut.write('factoryreset')
- dut.expect('OpenThread attached to netif', timeout=20)
- dut.write(' ')
- dut.write('state')
- # config thread
- def config_thread(dut:IdfDut, model:str, dataset:str='0') -> Union[str, None]:
- if model == 'random':
- dut.write('dataset init new')
- dut.expect('Done', timeout=2)
- dut.write('dataset commit active')
- dut.expect('Done', timeout=2)
- dut.write('ifconfig up')
- dut.expect('Done', timeout=2)
- dut.write('dataset active -x') # get dataset
- dut_data = dut.expect(r'\n(\w{212})\r', timeout=5)[1].decode()
- return str(dut_data)
- if model == 'appointed':
- tmp = 'dataset set active ' + str(dataset)
- dut.write(tmp)
- dut.expect('Done', timeout=2)
- dut.write('ifconfig up')
- dut.expect('Done', timeout=2)
- return None
- return None
- # get the mleid address of the thread
- def get_mleid_addr(dut:IdfDut) -> str:
- dut_adress = ''
- clean_buffer(dut)
- dut.write('ipaddr mleid')
- dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
- return dut_adress
- # get the rloc address of the thread
- def get_rloc_addr(dut:IdfDut) -> str:
- dut_adress = ''
- clean_buffer(dut)
- dut.write('ipaddr rloc')
- dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
- return dut_adress
- # get the linklocal address of the thread
- def get_linklocal_addr(dut:IdfDut) -> str:
- dut_adress = ''
- clean_buffer(dut)
- dut.write('ipaddr linklocal')
- dut_adress = dut.expect(r'\n((?:\w+:){7}\w+)\r', timeout=5)[1].decode()
- return dut_adress
- # get the global unicast address of the thread:
- def get_global_unicast_addr(dut:IdfDut, br:IdfDut) -> str:
- dut_adress = ''
- clean_buffer(br)
- br.write('br omrprefix')
- omrprefix = br.expect(r'\n((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
- clean_buffer(dut)
- dut.write('ipaddr')
- dut_adress = dut.expect(r'(%s(?:\w+:){3}\w+)\r' % str(omrprefix), timeout=5)[1].decode()
- return dut_adress
- # start thread
- def start_thread(dut:IdfDut) -> str:
- role = ''
- dut.write('thread start')
- tmp = dut.expect(r'Role detached -> (\w+)\W', timeout=20)[0]
- role = re.findall(r'Role detached -> (\w+)\W', str(tmp))[0]
- return role
- def wait_key_str(leader:IdfDut, child:IdfDut) -> None:
- wait(leader, 1)
- leader.expect('OpenThread attached to netif', timeout=20)
- leader.write(' ')
- leader.write('state')
- child.expect('OpenThread attached to netif', timeout=20)
- child.write(' ')
- child.write('state')
- def config_network(leader:IdfDut, child:IdfDut, leader_name:str, thread_dataset_model:str,
- thread_dataset:str, wifi:IdfDut, wifi_ssid:str, wifi_psk:str) -> str:
- wait_key_str(leader, child)
- return form_network_using_manual_configuration(leader, child, leader_name, thread_dataset_model,
- thread_dataset, wifi, wifi_ssid, wifi_psk)
- # config br and cli manually
- def form_network_using_manual_configuration(leader:IdfDut, child:IdfDut, leader_name:str, thread_dataset_model:str,
- thread_dataset:str, wifi:IdfDut, wifi_ssid:str, wifi_psk:str) -> str:
- reset_thread(leader)
- clean_buffer(leader)
- reset_thread(child)
- clean_buffer(child)
- leader.write('channel 12')
- leader.expect('Done', timeout=2)
- child.write('channel 12')
- child.expect('Done', timeout=2)
- res = '0000'
- if wifi_psk != '0000':
- res = connect_wifi(wifi, wifi_ssid, wifi_psk, 10)[0]
- leader_data = ''
- if thread_dataset_model == 'random':
- leader_data = str(config_thread(leader, 'random'))
- else:
- config_thread(leader, 'appointed', thread_dataset)
- if leader_name == 'br':
- leader.write('bbr enable')
- leader.expect('Done', timeout=2)
- role = start_thread(leader)
- assert role == 'leader'
- if thread_dataset_model == 'random':
- config_thread(child, 'appointed', leader_data)
- else:
- config_thread(child, 'appointed', thread_dataset)
- if leader_name != 'br':
- child.write('bbr enable')
- child.expect('Done', timeout=2)
- role = start_thread(child)
- assert role == 'child'
- wait(leader, 10)
- return res
- # ping of thread
- def ot_ping(dut:IdfDut, target:str, times:int) -> Tuple[int, int]:
- command = 'ping ' + str(target) + ' 0 ' + str(times)
- dut.write(command)
- transmitted = dut.expect(r'(\d+) packets transmitted', timeout=30)[1].decode()
- tx_count = int(transmitted)
- received = dut.expect(r'(\d+) packets received', timeout=30)[1].decode()
- rx_count = int(received)
- return tx_count, rx_count
- # connect Wi-Fi
- def connect_wifi(dut:IdfDut, ssid:str, psk:str, nums:int) -> Tuple[str, int]:
- clean_buffer(dut)
- ip_address = ''
- information = ''
- for order in range(1, nums):
- dut.write('wifi connect -s ' + str(ssid) + ' -p ' + str(psk))
- tmp = dut.expect(pexpect.TIMEOUT, timeout=5)
- if 'sta ip' in str(tmp):
- ip_address = re.findall(r'sta ip: (\w+.\w+.\w+.\w+),', str(tmp))[0]
- information = dut.expect(r'wifi sta (\w+ \w+ \w+)\W', timeout=5)[1].decode()
- if information == 'is connected successfully':
- break
- assert information == 'is connected successfully'
- return ip_address, order
- def reset_host_interface() -> None:
- interface_name = get_host_interface_name()
- flag = False
- try:
- command = 'ifconfig ' + interface_name + ' down'
- subprocess.call(command, shell=True, timeout=5)
- time.sleep(1)
- command = 'ifconfig ' + interface_name + ' up'
- subprocess.call(command, shell=True, timeout=10)
- time.sleep(1)
- flag = True
- finally:
- time.sleep(1)
- assert flag
- def set_interface_sysctl_options() -> None:
- interface_name = get_host_interface_name()
- flag = False
- try:
- command = 'sysctl -w net/ipv6/conf/' + interface_name + '/accept_ra=2'
- subprocess.call(command, shell=True, timeout=5)
- time.sleep(1)
- command = 'sysctl -w net/ipv6/conf/' + interface_name + '/accept_ra_rt_info_max_plen=128'
- subprocess.call(command, shell=True, timeout=5)
- time.sleep(1)
- flag = True
- finally:
- time.sleep(2)
- assert flag
- def init_interface_ipv6_address() -> None:
- interface_name = get_host_interface_name()
- flag = False
- try:
- command = 'ip -6 route | grep ' + interface_name + " | grep ra | awk {'print $1'} | xargs -I {} ip -6 route del {}"
- subprocess.call(command, shell=True, timeout=5)
- time.sleep(0.5)
- subprocess.call(command, shell=True, timeout=5)
- time.sleep(1)
- command = 'ip -6 address show dev ' + interface_name + \
- " scope global | grep 'inet6' | awk {'print $2'} | xargs -I {} ip -6 addr del {} dev " + interface_name
- subprocess.call(command, shell=True, timeout=5)
- time.sleep(1)
- flag = True
- finally:
- time.sleep(1)
- assert flag
- def get_host_interface_name() -> str:
- interfaces = netifaces.interfaces()
- interface_name = [s for s in interfaces if 'wl' in s][0]
- return str(interface_name)
- def clean_buffer(dut:IdfDut) -> None:
- str_length = str(len(dut.expect(pexpect.TIMEOUT, timeout=0.1)))
- dut.expect(r'[\s\S]{%s}' % str(str_length), timeout=10)
- def check_if_host_receive_ra(br:IdfDut) -> bool:
- interface_name = get_host_interface_name()
- clean_buffer(br)
- br.write('br omrprefix')
- omrprefix = br.expect(r'\n((?:\w+:){4}):/\d+\r', timeout=5)[1].decode()
- command = 'ip -6 route | grep ' + str(interface_name)
- out_str = subprocess.getoutput(command)
- print('br omrprefix: ', str(omrprefix))
- print('host route table:\n', str(out_str))
- return str(omrprefix) in str(out_str)
- def host_connect_wifi() -> None:
- command = '. /home/test/wlan_connection_OTTE.sh'
- subprocess.call(command, shell=True, timeout=30)
- time.sleep(5)
- def is_joined_wifi_network(br:IdfDut) -> bool:
- return check_if_host_receive_ra(br)
- thread_ipv6_group = 'ff04:0:0:0:0:0:0:125'
- def check_ipmaddr(dut:IdfDut) -> bool:
- clean_buffer(dut)
- dut.write('ipmaddr')
- info = dut.expect(pexpect.TIMEOUT, timeout=2)
- if thread_ipv6_group in str(info):
- return True
- return False
- def thread_is_joined_group(dut:IdfDut) -> bool:
- command = 'mcast join ' + thread_ipv6_group
- dut.write(command)
- dut.expect('Done', timeout=2)
- order = 0
- while order < 3:
- if check_ipmaddr(dut):
- return True
- dut.write(command)
- wait(dut, 2)
- order = order + 1
- return False
- host_ipv6_group = 'ff04::125'
- def host_joined_group() -> bool:
- interface_name = get_host_interface_name()
- command = 'netstat -g | grep ' + str(interface_name)
- out_str = subprocess.getoutput(command)
- print('groups:\n', str(out_str))
- return host_ipv6_group in str(out_str)
- class udp_parameter:
- def __init__(self, group:str='', try_join_udp_group:bool=False, timeout:float=15.0, udp_bytes:bytes=b''):
- self.group = group
- self.try_join_udp_group = try_join_udp_group
- self.timeout = timeout
- self.udp_bytes = udp_bytes
- def create_host_udp_server(myudp:udp_parameter) -> None:
- interface_name = get_host_interface_name()
- try:
- print('The host start to create udp server!')
- if_index = socket.if_nametoindex(interface_name)
- sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
- sock.bind(('::', 5090))
- sock.setsockopt(
- socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP,
- struct.pack('16si', socket.inet_pton(socket.AF_INET6, myudp.group),
- if_index))
- myudp.try_join_udp_group = True
- sock.settimeout(myudp.timeout)
- print('The host start to receive message!')
- myudp.udp_bytes = (sock.recvfrom(1024))[0]
- print('The host has received message: ', myudp.udp_bytes)
- except socket.error:
- print('The host did not receive message!')
- finally:
- print('Close the socket.')
- sock.close()
- def wait(dut:IdfDut, wait_time:float) -> None:
- dut.expect(pexpect.TIMEOUT, timeout=wait_time)
|