pytest_esp_eth.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: CC0-1.0
  3. import contextlib
  4. import logging
  5. import os
  6. import socket
  7. from collections.abc import Callable
  8. from threading import Thread
  9. from typing import Iterator
  10. import pytest
  11. from pytest_embedded import Dut
  12. from scapy.all import Ether, raw
  13. @contextlib.contextmanager
  14. def configure_eth_if() -> Iterator[socket.socket]:
  15. # try to determine which interface to use
  16. netifs = os.listdir('/sys/class/net/')
  17. logging.info('detected interfaces: %s', str(netifs))
  18. target_if = ''
  19. for netif in netifs:
  20. if netif.find('eth') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
  21. target_if = netif
  22. break
  23. if target_if == '':
  24. raise Exception('no network interface found')
  25. logging.info('Use %s for testing', target_if)
  26. so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, 0x2222)
  27. so.bind((target_if, 0))
  28. try:
  29. yield so
  30. finally:
  31. so.close()
  32. def send_eth_packet(mac: str) -> None:
  33. with configure_eth_if() as so:
  34. so.settimeout(10)
  35. payload = bytearray(1010)
  36. for i, _ in enumerate(payload):
  37. payload[i] = i & 0xff
  38. eth_frame = Ether(dst=mac, src=so.getsockname()[4], type=0x2222) / raw(payload)
  39. try:
  40. so.send(raw(eth_frame))
  41. except Exception as e:
  42. raise e
  43. def recv_resp_poke(i: int) -> None:
  44. with configure_eth_if() as so:
  45. so.settimeout(10)
  46. try:
  47. eth_frame = Ether(so.recv(60))
  48. if eth_frame.type == 0x2222 and eth_frame.load[0] == 0xfa:
  49. if eth_frame.load[1] != i:
  50. raise Exception('Missed Poke Packet')
  51. eth_frame.dst = eth_frame.src
  52. eth_frame.src = so.getsockname()[4]
  53. eth_frame.load = bytes.fromhex('fb') # POKE_RESP code
  54. so.send(raw(eth_frame))
  55. except Exception as e:
  56. raise e
  57. def traffic_gen(mac: str, enabled: Callable) -> None:
  58. with configure_eth_if() as so:
  59. payload = bytes.fromhex('ff') # DUMMY_TRAFFIC code
  60. payload += bytes(1485)
  61. eth_frame = Ether(dst=mac, src=so.getsockname()[4], type=0x2222) / raw(payload)
  62. try:
  63. while enabled() == 1:
  64. so.send(raw(eth_frame))
  65. except Exception as e:
  66. raise e
  67. def actual_test(dut: Dut) -> None:
  68. dut.expect_exact('Press ENTER to see the list of tests')
  69. dut.write('\n')
  70. dut.expect_exact('Enter test for running.')
  71. dut.write('"start_and_stop"')
  72. dut.expect_unity_test_output()
  73. dut.expect_exact("Enter next test, or 'enter' to see menu")
  74. dut.write('"get_set_mac"')
  75. dut.expect_unity_test_output()
  76. dut.expect_exact("Enter next test, or 'enter' to see menu")
  77. with configure_eth_if() as so:
  78. so.settimeout(30)
  79. dut.write('"ethernet_broadcast_transmit"')
  80. eth_frame = Ether(so.recv(1024))
  81. for i in range(0, 1010):
  82. if eth_frame.load[i] != i & 0xff:
  83. raise Exception('Packet content mismatch')
  84. dut.expect_unity_test_output()
  85. dut.expect_exact("Enter next test, or 'enter' to see menu")
  86. dut.write('"recv_pkt"')
  87. res = dut.expect(
  88. r'([\s\S]*)'
  89. r'DUT MAC: ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'
  90. )
  91. send_eth_packet('ff:ff:ff:ff:ff:ff') # broadcast frame
  92. send_eth_packet('01:00:00:00:00:00') # multicast frame
  93. send_eth_packet(res.group(2)) # unicast frame
  94. dut.expect_unity_test_output(extra_before=res.group(1))
  95. dut.expect_exact("Enter next test, or 'enter' to see menu")
  96. dut.write('"start_stop_stress_test"')
  97. res = dut.expect(
  98. r'([\s\S]*)'
  99. r'DUT MAC: ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'
  100. )
  101. # Start/stop under heavy Tx traffic
  102. for tx_i in range(10):
  103. recv_resp_poke(tx_i)
  104. # Start/stop under heavy Rx traffic
  105. traffic_en = 1
  106. thread = Thread(target=traffic_gen, args=(res.group(2), lambda:traffic_en, ))
  107. thread.start()
  108. try:
  109. for rx_i in range(10):
  110. recv_resp_poke(rx_i)
  111. finally:
  112. traffic_en = 0
  113. thread.join()
  114. dut.expect_unity_test_output()
  115. @pytest.mark.esp32
  116. @pytest.mark.ip101
  117. @pytest.mark.parametrize('config', [
  118. 'ip101',
  119. ], indirect=True)
  120. @pytest.mark.flaky(reruns=3, reruns_delay=5)
  121. def test_esp_eth_ip101(dut: Dut) -> None:
  122. actual_test(dut)
  123. @pytest.mark.esp32
  124. @pytest.mark.lan8720
  125. @pytest.mark.parametrize('config', [
  126. 'lan8720',
  127. ], indirect=True)
  128. @pytest.mark.flaky(reruns=3, reruns_delay=5)
  129. def test_esp_eth_lan8720(dut: Dut) -> None:
  130. actual_test(dut)