pytest_example_l2tap_echo.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: CC0-1.0
  3. import contextlib
  4. import logging
  5. import os
  6. import socket
  7. import sys
  8. from typing import Iterator
  9. import pytest
  10. from pytest_embedded import Dut
  11. from scapy.all import Ether, raw
  12. ETH_TYPE_1 = 0x2220
  13. ETH_TYPE_2 = 0x2221
  14. ETH_TYPE_3 = 0x2223
  15. @contextlib.contextmanager
  16. def configure_eth_if(eth_type: int, target_if: str='') -> Iterator[socket.socket]:
  17. if target_if == '':
  18. # try to determine which interface to use
  19. netifs = os.listdir('/sys/class/net/')
  20. logging.info('detected interfaces: %s', str(netifs))
  21. for netif in netifs:
  22. if netif.find('eth') == 0 or netif.find('enx') == 0 or netif.find('enp') == 0 or netif.find('eno') == 0:
  23. target_if = netif
  24. break
  25. if target_if == '':
  26. raise Exception('no network interface found')
  27. logging.info('Use %s for testing', target_if)
  28. so = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(eth_type))
  29. so.bind((target_if, 0))
  30. try:
  31. yield so
  32. finally:
  33. so.close()
  34. def send_recv_eth_frame(payload_str: str, eth_type: int, dest_mac: str, eth_if: str='') -> str:
  35. with configure_eth_if(eth_type, eth_if) as so:
  36. so.settimeout(10)
  37. eth_frame = Ether(dst=dest_mac, src=so.getsockname()[4], type=eth_type) / raw(payload_str.encode())
  38. try:
  39. so.send(raw(eth_frame))
  40. logging.info('Sent %d bytes to %s', len(eth_frame), dest_mac)
  41. logging.info('Sent msg: "%s"', payload_str)
  42. eth_frame_repl = Ether(so.recv(128))
  43. if eth_frame_repl.type == eth_type:
  44. logging.info('Received %d bytes echoed from %s', len(eth_frame_repl), eth_frame_repl.src)
  45. logging.info('Echoed msg: "%s"', eth_frame_repl.load.decode())
  46. except Exception as e:
  47. raise e
  48. # return echoed message and remove possible null characters which might have been appended since
  49. # minimal size of Ethernet frame to be transmitted physical layer is 60B (not including CRC)
  50. return str(eth_frame_repl.load.decode().rstrip('\x00'))
  51. def recv_eth_frame(eth_type: int, eth_if: str='') -> str:
  52. with configure_eth_if(eth_type, eth_if) as so:
  53. so.settimeout(10)
  54. try:
  55. eth_frame = Ether(so.recv(128))
  56. if eth_frame.type == eth_type:
  57. logging.info('Received %d bytes from %s', len(eth_frame), eth_frame.src)
  58. logging.info('Received msg: "%s"', eth_frame.load.decode())
  59. except Exception as e:
  60. raise e
  61. return str(eth_frame.load.decode().rstrip('\x00'))
  62. def actual_test(dut: Dut) -> None:
  63. # Get DUT's MAC address
  64. res = dut.expect(
  65. r'([\s\S]*)'
  66. r'Ethernet HW Addr ([0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2}:[0-9A-Fa-f]{2})'
  67. )
  68. dut_mac = res.group(2)
  69. # Receive "ESP32 Hello frame"
  70. recv_eth_frame(ETH_TYPE_3)
  71. # Sent a message and receive its echo
  72. message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1)
  73. echoed = send_recv_eth_frame(message, ETH_TYPE_1, dut_mac)
  74. if echoed == message:
  75. logging.info('PASS')
  76. else:
  77. raise Exception('Echoed message does not match!')
  78. message = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2)
  79. echoed = send_recv_eth_frame(message, ETH_TYPE_2, dut_mac)
  80. if echoed == message:
  81. logging.info('PASS')
  82. else:
  83. raise Exception('Echoed message does not match!')
  84. @pytest.mark.esp32 # internally tested using ESP32 with IP101 but may support all targets with SPI Ethernet
  85. @pytest.mark.ip101
  86. @pytest.mark.flaky(reruns=3, reruns_delay=5)
  87. def test_esp_netif_l2tap_example(dut: Dut) -> None:
  88. actual_test(dut)
  89. if __name__ == '__main__':
  90. logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
  91. message_1 = 'ESP32 test message with EthType ' + hex(ETH_TYPE_1)
  92. message_2 = 'ESP32 test message with EthType ' + hex(ETH_TYPE_2)
  93. # Usage: pytest_example_l2tap_echo.py [<dest_mac_address>] [<host_eth_interface>]
  94. if sys.argv[2:]: # if two arguments provided:
  95. send_recv_eth_frame(message_1, ETH_TYPE_1, sys.argv[1], sys.argv[2])
  96. send_recv_eth_frame(message_2, ETH_TYPE_2, sys.argv[1], sys.argv[2])
  97. elif sys.argv[1:]: # if one argument provided:
  98. send_recv_eth_frame(message_1, ETH_TYPE_1, sys.argv[1])
  99. send_recv_eth_frame(message_2, ETH_TYPE_2, sys.argv[1])
  100. else: # if no argument provided:
  101. send_recv_eth_frame(message_1, ETH_TYPE_1, 'ff:ff:ff:ff:ff:ff')
  102. send_recv_eth_frame(message_2, ETH_TYPE_2, 'ff:ff:ff:ff:ff:ff')