pytest_vlan_napt.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: CC0-1.0
  3. import ipaddress
  4. import subprocess
  5. import threading
  6. import time
  7. from typing import Dict, Union
  8. import pytest
  9. from pytest_embedded import Dut
  10. from scapy import layers
  11. from scapy.all import ICMP, IP, TCP, UDP, AsyncSniffer
  12. udp_port = 1234
  13. tcp_port = 4321
  14. def run_cmd(command: str, secure: bool=False) -> str:
  15. if secure is False:
  16. print(f'Running: {command}')
  17. cmd = command.strip().split(' ')
  18. # Run the command
  19. proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  20. # Send su password if secured
  21. if secure is True:
  22. proc.communicate(input=b'Esp@32')
  23. # Get the output
  24. stdout, stderr = proc.communicate()
  25. # Print the output
  26. if secure is False:
  27. if len(stdout.decode('utf-8')) > 0:
  28. print('Output: ', stdout.decode('utf-8'))
  29. if len(stderr.decode('utf-8')) > 0:
  30. print('Error: ', stderr.decode('utf-8'))
  31. return stdout.decode('utf-8')
  32. def run_cmd_sec(command: str) -> str:
  33. print(f'Running secured: {command}')
  34. return run_cmd(f'sudo -S -k {command}', secure=True)
  35. def clear_network(config: dict) -> None:
  36. # delete route
  37. for each_cmd in config['delete_route_cmd_l']:
  38. run_cmd_sec(each_cmd)
  39. # destroy Vlan interfaces
  40. for each_cmd in config['vlan_destroy_cmd_l']:
  41. run_cmd_sec(each_cmd)
  42. def setup_network(config: dict) -> None:
  43. # Clear network before setting it up
  44. clear_network(config)
  45. # Create Vlan interfaces
  46. for each_cmd in config['vlan_create_cmd_l']:
  47. run_cmd_sec(each_cmd)
  48. # set route
  49. for each_cmd in config['set_route_cmd_l']:
  50. run_cmd_sec(each_cmd)
  51. def create_config(dut: Dut) -> dict:
  52. pc_iface = dut.app.sdkconfig.get('EXAMPLE_VLAN_PYTEST_PC_IFACE')
  53. vlanClient_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_ETHERNET_VLAN_ID')),
  54. 'name': 'vlanClient',
  55. 'ip': dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_ADDR_DEF_GW')}
  56. vlanServer_conf = {'id': str(dut.app.sdkconfig.get('EXAMPLE_EXTRA_ETHERNET_VLAN_ID')),
  57. 'name': 'vlanServer',
  58. 'ip': dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_ADDR_DEF_GW')}
  59. esp_vlanClient_ip = dut.app.sdkconfig.get('EXAMPLE_VLAN_STATIC_IPV4_ADDR')
  60. esp_vlanServer_ip = dut.app.sdkconfig.get('EXAMPLE_EXTRA_VLAN_STATIC_IPV4_ADDR')
  61. subnet_mask = ipaddress.IPv4Address('255.255.255.0')
  62. tmp_ip = ipaddress.IPv4Address(vlanServer_conf['ip'])
  63. vlanServer_net_addr = ipaddress.IPv4Address(int(tmp_ip) & int(subnet_mask))
  64. config: Dict[str, Union[str, dict, dict, str, str, list, list, list, list]] = {
  65. # Basic Configurations
  66. 'pc_iface': pc_iface,
  67. 'vlanClient': vlanClient_conf,
  68. 'vlanServer': vlanServer_conf,
  69. 'esp_vlanClient_ip': esp_vlanClient_ip,
  70. 'esp_vlanServer_ip': esp_vlanServer_ip,
  71. 'vlan_create_cmd_l': [f'ip netns add ns_vlanClient',
  72. f"ip link add link {pc_iface} name {vlanClient_conf['name']} type vlan id {vlanClient_conf['id']}",
  73. f"ip link set {vlanClient_conf['name']} netns ns_vlanClient",
  74. f"ip netns exec ns_vlanClient ip addr add {vlanClient_conf['ip']}/255.255.255.0 dev {vlanClient_conf['name']}",
  75. f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} up",
  76. f"ip link add link {pc_iface} name {vlanServer_conf['name']} type vlan id {vlanServer_conf['id']}",
  77. f"ip addr add {vlanServer_conf['ip']}/255.255.255.0 dev {vlanServer_conf['name']}",
  78. f"ip link set dev {vlanServer_conf['name']} up"],
  79. 'vlan_destroy_cmd_l': [f"ip netns exec ns_vlanClient ip link set dev {vlanClient_conf['name']} down",
  80. f"ip netns exec ns_vlanClient ip link delete {vlanClient_conf['name']}",
  81. f"ip link set dev {vlanServer_conf['name']} down",
  82. f"ip link delete {vlanServer_conf['name']}",
  83. f'ip netns delete ns_vlanClient'],
  84. 'set_route_cmd_l': [f'ip netns exec ns_vlanClient ip route add {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
  85. 'delete_route_cmd_l': [f'ip netns exec ns_vlanClient ip route delete {vlanServer_net_addr}/24 via {esp_vlanClient_ip}'],
  86. }
  87. return config
  88. # Ping Test
  89. def ping_test(config: dict) -> None:
  90. setup_network(config)
  91. capture = AsyncSniffer(iface=config['vlanServer']['name'], filter='icmp', count=10)
  92. # Start sniffing
  93. capture.start()
  94. time.sleep(1)
  95. # Run network test commands here
  96. print(run_cmd_sec(f"ip netns exec ns_vlanClient ping -I {config['vlanClient']['ip']} {config['vlanServer']['ip']} -c 10"))
  97. # Stop sniffing
  98. capture.join(timeout=20)
  99. vlanServer_pkt_list = capture.results
  100. clear_network(config)
  101. # Test Validation
  102. vlanServer_forward_flag = False
  103. vlanServer_return_flag = False
  104. if vlanServer_pkt_list is None:
  105. print('Failure: No packets captured')
  106. assert False
  107. print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
  108. for pkt in vlanServer_pkt_list:
  109. print('Summary: ', pkt.summary())
  110. if pkt[ICMP].type == 8 and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
  111. vlanServer_forward_flag = True
  112. if pkt[ICMP].type == 0 and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
  113. vlanServer_return_flag = True
  114. assert vlanServer_forward_flag and vlanServer_return_flag
  115. # UDP Test
  116. def udp_server(serverip: str, port: int) -> None:
  117. print(f'UDP server listening on IP: {serverip} port: {port}')
  118. run_cmd(f'timeout 10s iperf3 -s -p {port}')
  119. def udp_client(serverip: str, port: int) -> None:
  120. time.sleep(1)
  121. print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -u -p {port} --bidir -k 20'))
  122. def udp_server_client_comm(config: dict, port: int) -> None:
  123. server_thread = threading.Thread(target=udp_server, args=(config['vlanServer']['ip'], port,))
  124. client_thread = threading.Thread(target=udp_client, args=(config['vlanServer']['ip'], port))
  125. server_thread.start()
  126. client_thread.start()
  127. server_thread.join()
  128. client_thread.join()
  129. print('UDP Test Done')
  130. def udp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
  131. return UDP in packet and (packet[UDP].dport == udp_port or packet[UDP].sport == udp_port)
  132. def udp_test(config: dict) -> None:
  133. setup_network(config)
  134. capture = AsyncSniffer(iface=config['vlanServer']['name'],
  135. filter='udp',
  136. lfilter=udp_lfilter,
  137. count=10)
  138. # Start sniffing
  139. capture.start()
  140. time.sleep(1)
  141. # Run network test commands here
  142. udp_server_client_comm(config, udp_port)
  143. # Stop sniffing
  144. capture.join(timeout=20)
  145. vlanServer_pkt_list = capture.results
  146. clear_network(config)
  147. # Test Validation
  148. vlanServer_forward_flag = False
  149. vlanServer_return_flag = False
  150. if vlanServer_pkt_list is None:
  151. print('Failure: No packets captured')
  152. assert False
  153. print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
  154. for pkt in vlanServer_pkt_list:
  155. print('Summary: ', pkt.summary())
  156. if UDP in pkt:
  157. if pkt[UDP].dport == udp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
  158. vlanServer_forward_flag = True
  159. if pkt[UDP].sport == udp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
  160. vlanServer_return_flag = True
  161. assert vlanServer_forward_flag and vlanServer_return_flag
  162. # TCP Test
  163. def tcp_server(serverip: str, port: int) -> None:
  164. print(f'TCP server listening on IP: {serverip} port: {port}')
  165. run_cmd(f'timeout 10s iperf3 -s -p {port}')
  166. def tcp_client(serverip: str, port: int) -> None:
  167. time.sleep(1)
  168. print(run_cmd_sec(f'timeout 10s ip netns exec ns_vlanClient iperf3 -c {serverip} -p {port} --bidir -k 20'))
  169. def tcp_server_client_comm(config: dict, port: int) -> None:
  170. server_thread = threading.Thread(target=tcp_server, args=(config['vlanServer']['ip'], port,))
  171. client_thread = threading.Thread(target=tcp_client, args=(config['vlanServer']['ip'], port))
  172. server_thread.start()
  173. client_thread.start()
  174. server_thread.join()
  175. client_thread.join()
  176. print('TCP Test Done')
  177. def tcp_lfilter(packet: layers.l2.Ether) -> layers.l2.Ether:
  178. return TCP in packet and (packet[TCP].dport == tcp_port or packet[TCP].sport == tcp_port)
  179. def tcp_test(config: dict) -> None:
  180. setup_network(config)
  181. capture = AsyncSniffer(iface=config['vlanServer']['name'],
  182. filter='tcp',
  183. lfilter=tcp_lfilter,
  184. count=10)
  185. # Start sniffing
  186. capture.start()
  187. time.sleep(1)
  188. # Run network test commands here
  189. tcp_server_client_comm(config, tcp_port)
  190. # Stop sniffing
  191. capture.join(timeout=20)
  192. vlanServer_pkt_list = capture.results
  193. clear_network(config)
  194. # Test Validation
  195. vlanServer_forward_flag = False
  196. vlanServer_return_flag = False
  197. if vlanServer_pkt_list is None:
  198. print('Failure: No packets captured')
  199. assert False
  200. print(f"Captured: {len(vlanServer_pkt_list)} packets on interface {config['vlanServer']['name']}")
  201. for pkt in vlanServer_pkt_list:
  202. print('Summary: ', pkt.summary())
  203. if TCP in pkt:
  204. if pkt[TCP].dport == tcp_port and pkt[IP].src == config['esp_vlanServer_ip'] and pkt[IP].dst == config['vlanServer']['ip']:
  205. vlanServer_forward_flag = True
  206. if pkt[TCP].sport == tcp_port and pkt[IP].src == config['vlanServer']['ip'] and pkt[IP].dst == config['esp_vlanServer_ip']:
  207. vlanServer_return_flag = True
  208. assert vlanServer_forward_flag and vlanServer_return_flag
  209. @pytest.mark.esp32
  210. @pytest.mark.ethernet_vlan
  211. def test_vlan_napt_pingtest(dut: Dut) -> None:
  212. dut.expect('main_task: Returned from app_main()')
  213. test_conf = create_config(dut)
  214. ping_test(test_conf)
  215. @pytest.mark.esp32
  216. @pytest.mark.ethernet_vlan
  217. def test_vlan_napt_udptest(dut: Dut) -> None:
  218. dut.expect('main_task: Returned from app_main()')
  219. test_conf = create_config(dut)
  220. udp_test(test_conf)
  221. @pytest.mark.esp32
  222. @pytest.mark.ethernet_vlan
  223. def test_vlan_napt_tcptest(dut: Dut) -> None:
  224. dut.expect('main_task: Returned from app_main()')
  225. test_conf = create_config(dut)
  226. tcp_test(test_conf)