pytest_esp_eth.py 6.5 KB


  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: CC0-1.0
  3. import contextlib
  4. import logging
  5. import os
  6. import socket
  7. import time
  8. from multiprocessing import Pipe, Process, connection
  9. from typing import Iterator
  10. import pytest
  11. from pytest_embedded import Dut
  12. from scapy.all import Ether, raw
  13. ETH_TYPE = 0x2222
  14. class EthTestIntf(object):
  15. def __init__(self, eth_type: int, my_if: str = ''):
  16. self.target_if = ''
  17. self.eth_type = eth_type
  18. self.find_target_if(my_if)
  19. def find_target_if(self, my_if: str = '') -> None:
  20. # try to determine which interface to use
  21. netifs = os.listdir('/sys/class/net/')
  22. logging.info('detected interfaces: %s', str(netifs))
  23. for netif in netifs:
  24. # if no interface defined, try to find it automatically
  25. if my_if == '':
  26. if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
  27. self.target_if = netif
  28. break
  29. else:
  30. if netif.find(my_if) == 0:
  31. self.target_if = my_if
  32. break
  33. if self.target_if == '':
  34. raise Exception('network interface not found')
  35. logging.info('Use %s for testing', self.target_if)
  36. @contextlib.contextmanager
  37. def configure_eth_if(self) -> Iterator[socket.socket]:
  38. so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(self.eth_type))
  39. so.bind((self.target_if, 0))
  40. try:
  41. yield so
  42. finally:
  43. so.close()
  44. def send_eth_packet(self, mac: str) -> None:
  45. with self.configure_eth_if() as so:
  46. so.settimeout(10)
  47. payload = bytearray(1010)
  48. for i, _ in enumerate(payload):
  49. payload[i] = i & 0xff
  50. eth_frame = Ether(dst=mac, src=so.getsockname()[4], type=self.eth_type) / raw(payload)
  51. try:
  52. so.send(raw(eth_frame))
  53. except Exception as e:
  54. raise e
  55. def recv_resp_poke(self, i: int) -> None:
  56. with self.configure_eth_if() as so:
  57. so.settimeout(10)
  58. try:
  59. eth_frame = Ether(so.recv(60))
  60. if eth_frame.type == self.eth_type and eth_frame.load[0] == 0xfa:
  61. if eth_frame.load[1] != i:
  62. raise Exception('Missed Poke Packet')
  63. eth_frame.dst = eth_frame.src
  64. eth_frame.src = so.getsockname()[4]
  65. eth_frame.load = bytes.fromhex('fb') # POKE_RESP code
  66. so.send(raw(eth_frame))
  67. except Exception as e:
  68. raise e
  69. def traffic_gen(self, mac: str, pipe_rcv:connection.Connection) -> None:
  70. with self.configure_eth_if() as so:
  71. payload = bytes.fromhex('ff') # DUMMY_TRAFFIC code
  72. payload += bytes(1485)
  73. eth_frame = Ether(dst=mac, src=so.getsockname()[4], type=self.eth_type) / raw(payload)
  74. try:
  75. while pipe_rcv.poll() is not True:
  76. so.send(raw(eth_frame))
  77. except Exception as e:
  78. raise e
  79. def ethernet_test(dut: Dut) -> None:
  80. dut.expect_exact('Press ENTER to see the list of tests')
  81. dut.write('\n')
  82. dut.expect_exact('Enter test for running.')
  83. dut.write('[ethernet]')
  84. dut.expect_unity_test_output(timeout=980)
  85. def ethernet_int_emac_hal_test(dut: Dut) -> None:
  86. dut.expect_exact('Press ENTER to see the list of tests')
  87. dut.write('\n')
  88. dut.expect_exact('Enter test for running.')
  89. dut.write('[emac_hal]')
  90. dut.expect_unity_test_output()
  91. def ethernet_l2_test(dut: Dut) -> None:
  92. target_if = EthTestIntf(ETH_TYPE)
  93. dut.expect_exact('Press ENTER to see the list of tests')
  94. dut.write('\n')
  95. dut.expect_exact('Enter test for running.')
  96. with target_if.configure_eth_if() as so:
  97. so.settimeout(30)
  98. dut.write('"ethernet broadcast transmit"')
  99. eth_frame = Ether(so.recv(1024))
  100. for i in range(0, 1010):
  101. if eth_frame.load[i] != i & 0xff:
  102. raise Exception('Packet content mismatch')
  103. dut.expect_unity_test_output()
  104. dut.expect_exact("Enter next test, or 'enter' to see menu")
  105. dut.write('"ethernet recv_pkt"')
  106. res = dut.expect(
  107. r'([\s\S]*)'
  108. r'DUT MAC: ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'
  109. )
  110. time.sleep(1)
  111. target_if.send_eth_packet('ff:ff:ff:ff:ff:ff') # broadcast frame
  112. target_if.send_eth_packet('01:00:00:00:00:00') # multicast frame
  113. target_if.send_eth_packet(res.group(2)) # unicast frame
  114. dut.expect_unity_test_output(extra_before=res.group(1))
  115. dut.expect_exact("Enter next test, or 'enter' to see menu")
  116. dut.write('"ethernet start/stop stress test under heavy traffic"')
  117. res = dut.expect(
  118. r'([\s\S]*)'
  119. r'DUT MAC: ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'
  120. )
  121. # Start/stop under heavy Tx traffic
  122. for tx_i in range(10):
  123. target_if.recv_resp_poke(tx_i)
  124. # Start/stop under heavy Rx traffic
  125. pipe_rcv, pipe_send = Pipe(False)
  126. tx_proc = Process(target=target_if.traffic_gen, args=(res.group(2), pipe_rcv, ))
  127. tx_proc.start()
  128. try:
  129. for rx_i in range(10):
  130. target_if.recv_resp_poke(rx_i)
  131. finally:
  132. pipe_send.send(0) # just send some dummy data
  133. tx_proc.join(5)
  134. if tx_proc.exitcode is None:
  135. tx_proc.terminate()
  136. dut.expect_unity_test_output(extra_before=res.group(1))
  137. @pytest.mark.esp32
  138. @pytest.mark.ethernet
  139. @pytest.mark.parametrize('config', [
  140. 'default_ip101',
  141. 'release_ip101',
  142. 'single_core_ip101'
  143. ], indirect=True)
  144. @pytest.mark.flaky(reruns=3, reruns_delay=5)
  145. def test_esp_ethernet(dut: Dut) -> None:
  146. ethernet_test(dut)
  147. @pytest.mark.esp32
  148. @pytest.mark.ethernet
  149. @pytest.mark.parametrize('config', [
  150. 'default_ip101',
  151. ], indirect=True)
  152. @pytest.mark.flaky(reruns=3, reruns_delay=5)
  153. def test_esp_emac_hal(dut: Dut) -> None:
  154. ethernet_int_emac_hal_test(dut)
  155. @pytest.mark.esp32
  156. @pytest.mark.ip101
  157. @pytest.mark.parametrize('config', [
  158. 'default_ip101',
  159. ], indirect=True)
  160. @pytest.mark.flaky(reruns=3, reruns_delay=5)
  161. def test_esp_eth_ip101(dut: Dut) -> None:
  162. ethernet_l2_test(dut)
  163. @pytest.mark.esp32
  164. @pytest.mark.lan8720
  165. @pytest.mark.parametrize('config', [
  166. 'default_lan8720',
  167. ], indirect=True)
  168. @pytest.mark.flaky(reruns=3, reruns_delay=5)
  169. def test_esp_eth_lan8720(dut: Dut) -> None:
  170. ethernet_l2_test(dut)