Parcourir la source

ci: Made socket tests more robust

Added common timeout
Added debug logs for both addr families
Renamed example tests to have different names
David Cermak il y a 5 ans
Parent
commit
25499115eb

+ 14 - 7
examples/protocols/sockets/tcp_client/example_test.py

@@ -35,7 +35,7 @@ class TcpServer:
         self.port = port
         self.socket = socket.socket(family_addr, socket.SOCK_STREAM)
         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.socket.settimeout(10.0)
+        self.socket.settimeout(60.0)
         self.shutdown = Event()
         self.persist = persist
         self.family_addr = family_addr
@@ -48,6 +48,7 @@ class TcpServer:
             raise
         self.socket.listen(1)
 
+        print("Starting server on port={} family_addr={}".format(self.port, self.family_addr))
         self.server_thread = Thread(target=self.run_server)
         self.server_thread.start()
         return self
@@ -85,7 +86,7 @@ class TcpServer:
 
 
 @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
-def test_examples_protocol_socket(env, extra_data):
+def test_examples_protocol_socket_tcpclient(env, extra_data):
     """
     steps:
       1. join AP
@@ -101,16 +102,22 @@ def test_examples_protocol_socket(env, extra_data):
     # start test
     dut1.start_app()
 
-    data = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
-    print("Connected with IPv4: {}".format(data[0]))
+    ipv4 = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)[0]
+    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
+    ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
+    print("Connected with IPv4={} and IPv6={}".format(ipv4, ipv6))
 
     # test IPv4
     with TcpServer(PORT, socket.AF_INET):
-        dut1.write(get_my_ip(netifaces.AF_INET))
+        server_ip = get_my_ip(netifaces.AF_INET)
+        print("Connect tcp client to server IP={}".format(server_ip))
+        dut1.write(server_ip)
         dut1.expect(re.compile(r"OK: Message from ESP32"))
     # test IPv6
     with TcpServer(PORT, socket.AF_INET6):
-        dut1.write(get_my_ip(netifaces.AF_INET6))
+        server_ip = get_my_ip(netifaces.AF_INET6)
+        print("Connect tcp client to server IP={}".format(server_ip))
+        dut1.write(server_ip)
         dut1.expect(re.compile(r"OK: Message from ESP32"))
 
 
@@ -121,4 +128,4 @@ if __name__ == '__main__':
         with TcpServer(PORT, family_addr, persist=True) as s:
             print(input("Press Enter stop the server..."))
     else:
-        test_examples_protocol_socket()
+        test_examples_protocol_socket_tcpclient()

+ 3 - 2
examples/protocols/sockets/tcp_server/example_test.py

@@ -27,6 +27,7 @@ def tcp_client(address, payload):
         family_addr, socktype, proto, canonname, addr = res
     try:
         sock = socket.socket(family_addr, socket.SOCK_STREAM)
+        sock.settimeout(60.0)
     except socket.error as msg:
         print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1])
         raise
@@ -46,7 +47,7 @@ def tcp_client(address, payload):
 
 
 @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
-def test_examples_protocol_socket(env, extra_data):
+def test_examples_protocol_socket_tcpserver(env, extra_data):
     MESSAGE = "Data to ESP"
     """
     steps:
@@ -85,4 +86,4 @@ if __name__ == '__main__':
         # Usage: example_test.py <server_address> <message_to_send_to_server>
         tcp_client(sys.argv[1], sys.argv[2])
     else:               # otherwise run standard example test as in the CI
-        test_examples_protocol_socket()
+        test_examples_protocol_socket_tcpserver()

+ 14 - 7
examples/protocols/sockets/udp_client/example_test.py

@@ -36,7 +36,7 @@ class UdpServer:
         self.family_addr = family_addr
         self.socket = socket.socket(family_addr, socket.SOCK_DGRAM)
         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.socket.settimeout(30.0)
+        self.socket.settimeout(60.0)
         self.shutdown = Event()
         self.persist = persist
 
@@ -47,6 +47,7 @@ class UdpServer:
             print("Bind failed:{}".format(e))
             raise
 
+        print("Starting server on port={} family_addr={}".format(self.port, self.family_addr))
         self.server_thread = Thread(target=self.run_server)
         self.server_thread.start()
         return self
@@ -78,7 +79,7 @@ class UdpServer:
 
 
 @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
-def test_examples_protocol_socket(env, extra_data):
+def test_examples_protocol_socket_udpclient(env, extra_data):
     """
     steps:
       1. join AP
@@ -94,16 +95,22 @@ def test_examples_protocol_socket(env, extra_data):
     # start test
     dut1.start_app()
 
-    data = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)
-    print("Connected with IPv4: {}".format(data[0]))
+    ipv4 = dut1.expect(re.compile(r" IPv4 address: ([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"), timeout=30)[0]
+    ipv6_r = r':'.join((r'[0-9a-fA-F]{4}',) * 8)    # expect all 8 octets from IPv6 (assumes it's printed in the long form)
+    ipv6 = dut1.expect(re.compile(r' IPv6 address: ({})'.format(ipv6_r)), timeout=30)[0]
+    print("Connected with IPv4={} and IPv6={}".format(ipv4, ipv6))
 
     # test IPv4
     with UdpServer(PORT, socket.AF_INET):
-        dut1.write(get_my_ip(netifaces.AF_INET))
+        server_ip = get_my_ip(netifaces.AF_INET)
+        print("Connect udp client to server IP={}".format(server_ip))
+        dut1.write(server_ip)
         dut1.expect(re.compile(r"OK: Message from ESP32"))
     # test IPv6
     with UdpServer(PORT, socket.AF_INET6):
-        dut1.write(get_my_ip(netifaces.AF_INET6))
+        server_ip = get_my_ip(netifaces.AF_INET6)
+        print("Connect udp client to server IP={}".format(server_ip))
+        dut1.write(server_ip)
         dut1.expect(re.compile(r"OK: Message from ESP32"))
 
 
@@ -114,4 +121,4 @@ if __name__ == '__main__':
         with UdpServer(PORT, family_addr, persist=True) as s:
             print(input("Press Enter stop the server..."))
     else:
-        test_examples_protocol_socket()
+        test_examples_protocol_socket_udpclient()

+ 3 - 2
examples/protocols/sockets/udp_server/example_test.py

@@ -27,6 +27,7 @@ def udp_client(address, payload):
         family_addr, socktype, proto, canonname, addr = res
     try:
         sock = socket.socket(family_addr, socket.SOCK_DGRAM)
+        sock.settimeout(60.0)
     except socket.error as msg:
         print('Could not create socket: ' + str(msg[0]) + ': ' + msg[1])
         raise
@@ -44,7 +45,7 @@ def udp_client(address, payload):
 
 
 @ttfw_idf.idf_example_test(env_tag="Example_WIFI")
-def test_examples_protocol_socket(env, extra_data):
+def test_examples_protocol_socket_udpserver(env, extra_data):
     MESSAGE = "Data to ESP"
     """
     steps:
@@ -83,4 +84,4 @@ if __name__ == '__main__':
         # Usage: example_test.py <server_address> <message_to_send_to_server>
         udp_client(sys.argv[1], sys.argv[2])
     else:               # otherwise run standard example test as in the CI
-        test_examples_protocol_socket()
+        test_examples_protocol_socket_udpserver()