pytest_otbr.py 20 KB


  1. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Unlicense OR CC0-1.0
  3. # !/usr/bin/env python3
  4. import os.path
  5. import re
  6. import subprocess
  7. import threading
  8. import time
  9. from typing import Tuple
  10. import ot_ci_function as ocf
  11. import pexpect
  12. import pytest
  13. from pytest_embedded_idf.dut import IdfDut
  14. # This file contains the test scripts for Thread:
  15. # Case 1: Thread network formation and attaching
  16. # A Thread Border Router forms a Thread network, Thread devices attache to it, then test ping connection between them.
  17. # Case 2: Bidirectional IPv6 connectivity
  18. # Test IPv6 ping connection between Thread device and Linux Host (via Thread Border Router).
  19. # Case 3: Multicast forwarding from Wi-Fi to Thread network
  20. # Thread device joins the multicast group, then test group communication from Wi-Fi to Thread network.
  21. # Case 4: Multicast forwarding from Thread to Wi-Fi network
  22. # Linux Host joins the multicast group, test group communication from Thread to Wi-Fi network.
  23. # Case 5: discover Serice published by Thread device
  24. # Thread device publishes the service, Linux Host discovers the service on Wi-Fi network.
  25. # Case 6: discover Serice published by W-Fi device
  26. # Linux Host device publishes the service on Wi-Fi network, Thread device discovers the service.
  27. # Case 7: ICMP communication via NAT64
  28. # Thread device (IPV6) ping the host device (IPV4) via NAT64.
  29. # Case 8: UDP communication via NAT64
  30. # Thread device (IPV6) send udp message to the host device (IPV4) via NAT64.
  31. # Case 9: TCP communication via NAT64
  32. # Thread device (IPV6) send tcp message to the host device (IPV4) via NAT64.
  33. @pytest.fixture(scope='module', name='Init_avahi')
  34. def fixture_Init_avahi() -> bool:
  35. print('Init Avahi')
  36. ocf.start_avahi()
  37. time.sleep(10)
  38. return True
  39. @pytest.fixture(name='Init_interface')
  40. def fixture_Init_interface() -> bool:
  41. print('Init interface')
  42. ocf.init_interface_ipv6_address()
  43. ocf.reset_host_interface()
  44. time.sleep(30)
  45. ocf.set_interface_sysctl_options()
  46. return True
  47. default_br_ot_para = ocf.thread_parameter('leader', '', '12', '7766554433221100', True)
  48. default_br_wifi_para = ocf.wifi_parameter('OTCITE', 'otcitest888', 10)
  49. default_cli_ot_para = ocf.thread_parameter('router', '', '12', '', False)
  50. # Case 1: Thread network formation and attaching
  51. @pytest.mark.esp32s3
  52. @pytest.mark.esp32h4
  53. @pytest.mark.esp32c6
  54. @pytest.mark.openthread_br
  55. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  56. @pytest.mark.parametrize(
  57. 'port, config, count, app_path, beta_target, target', [
  58. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR|/dev/USB_CLI_C6', 'rcp|cli|br|cli_c6', 4,
  59. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  60. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  61. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}'
  62. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}',
  63. 'esp32h2beta2|esp32h2beta2|esp32s3|esp32c6', 'esp32h4|esp32h4|esp32s3|esp32c6'),
  64. ],
  65. indirect=True,
  66. )
  67. def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut, IdfDut]) -> None:
  68. br = dut[2]
  69. cli_h2 = dut[1]
  70. cli_c6 = dut[3]
  71. dut[0].serial.stop_redirect_thread()
  72. cli_list = [cli_h2, cli_c6]
  73. router_extaddr_list = ['7766554433221101', '7766554433221102']
  74. ocf.reset_thread(br)
  75. for cli in cli_list:
  76. ocf.reset_thread(cli)
  77. br_ot_para = default_br_ot_para
  78. ocf.joinThreadNetwork(br, br_ot_para)
  79. cli_ot_para = default_cli_ot_para
  80. cli_ot_para.dataset = ocf.getDataset(br)
  81. try:
  82. order = 0
  83. for cli in cli_list:
  84. cli_ot_para.exaddr = router_extaddr_list[order]
  85. order = order + 1
  86. ocf.joinThreadNetwork(cli, cli_ot_para)
  87. for cli in cli_list:
  88. cli_mleid_addr = ocf.get_mleid_addr(cli)
  89. br_mleid_addr = ocf.get_mleid_addr(br)
  90. rx_nums = ocf.ot_ping(cli, br_mleid_addr, 5)[1]
  91. assert rx_nums == 5
  92. rx_nums = ocf.ot_ping(br, cli_mleid_addr, 5)[1]
  93. assert rx_nums == 5
  94. finally:
  95. br.write('factoryreset')
  96. for cli in cli_list:
  97. cli.write('factoryreset')
  98. time.sleep(3)
  99. # Form a Wi-Fi/Thread network with a Wi-Fi host, a border router and a Thread end device
  100. # Topology:
  101. # Border_Router
  102. # / \
  103. # / \
  104. # Wi-FI_Host Thread_End_Device
  105. def formBasicWiFiThreadNetwork(br:IdfDut, cli:IdfDut) -> None:
  106. ocf.reset_thread(br)
  107. ocf.reset_thread(cli)
  108. ocf.joinWiFiNetwork(br, default_br_wifi_para)
  109. ocf.joinThreadNetwork(br, default_br_ot_para)
  110. ot_para = default_cli_ot_para
  111. ot_para.dataset = ocf.getDataset(br)
  112. ot_para.exaddr = '7766554433221101'
  113. ocf.joinThreadNetwork(cli, ot_para)
  114. ocf.wait(cli,10)
  115. # Case 2: Bidirectional IPv6 connectivity
  116. @pytest.mark.esp32s3
  117. @pytest.mark.esp32h4
  118. @pytest.mark.openthread_br
  119. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  120. @pytest.mark.parametrize(
  121. 'port, config, count, app_path, beta_target, target', [
  122. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  123. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  124. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  125. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  126. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  127. ],
  128. indirect=True,
  129. )
  130. def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  131. br = dut[2]
  132. cli = dut[1]
  133. assert Init_interface
  134. dut[0].serial.stop_redirect_thread()
  135. formBasicWiFiThreadNetwork(br, cli)
  136. try:
  137. assert ocf.is_joined_wifi_network(br)
  138. cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
  139. print('cli_global_unicast_addr', cli_global_unicast_addr)
  140. command = 'ping ' + str(cli_global_unicast_addr) + ' -c 10'
  141. out_str = subprocess.getoutput(command)
  142. print('ping result:\n', str(out_str))
  143. role = re.findall(r' (\d+)%', str(out_str))[0]
  144. assert role != '100'
  145. interface_name = ocf.get_host_interface_name()
  146. command = 'ifconfig ' + interface_name
  147. out_bytes = subprocess.check_output(command, shell=True, timeout=5)
  148. out_str = out_bytes.decode('utf-8')
  149. host_global_unicast_addr = re.findall(r'inet6 ((?:\w+:){7}\w+) prefixlen 64 scopeid 0x0<global>', str(out_str))
  150. rx_nums = 0
  151. for ip_addr in host_global_unicast_addr:
  152. txrx_nums = ocf.ot_ping(cli, str(ip_addr), 5)
  153. rx_nums = rx_nums + int(txrx_nums[1])
  154. assert rx_nums != 0
  155. finally:
  156. br.write('factoryreset')
  157. cli.write('factoryreset')
  158. time.sleep(3)
  159. # Case 3: Multicast forwarding from Wi-Fi to Thread network
  160. @pytest.mark.esp32s3
  161. @pytest.mark.esp32h4
  162. @pytest.mark.openthread_br
  163. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  164. @pytest.mark.parametrize(
  165. 'port, config, count, app_path, beta_target, target', [
  166. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  167. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  168. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  169. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  170. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  171. ],
  172. indirect=True,
  173. )
  174. def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  175. br = dut[2]
  176. cli = dut[1]
  177. assert Init_interface
  178. dut[0].serial.stop_redirect_thread()
  179. formBasicWiFiThreadNetwork(br, cli)
  180. try:
  181. assert ocf.is_joined_wifi_network(br)
  182. br.write('bbr')
  183. br.expect('server16', timeout=5)
  184. assert ocf.thread_is_joined_group(cli)
  185. interface_name = ocf.get_host_interface_name()
  186. command = 'ping -I ' + str(interface_name) + ' -t 64 ff04::125 -c 10'
  187. out_str = subprocess.getoutput(command)
  188. print('ping result:\n', str(out_str))
  189. role = re.findall(r' (\d+)%', str(out_str))[0]
  190. assert role != '100'
  191. finally:
  192. br.write('factoryreset')
  193. cli.write('factoryreset')
  194. time.sleep(3)
  195. # Case 4: Multicast forwarding from Thread to Wi-Fi network
  196. @pytest.mark.esp32s3
  197. @pytest.mark.esp32h4
  198. @pytest.mark.openthread_br
  199. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  200. @pytest.mark.parametrize(
  201. 'port, config, count, app_path, beta_target, target', [
  202. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  203. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  204. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  205. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  206. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  207. ],
  208. indirect=True,
  209. )
  210. def test_multicast_forwarding_B(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  211. br = dut[2]
  212. cli = dut[1]
  213. assert Init_interface
  214. dut[0].serial.stop_redirect_thread()
  215. formBasicWiFiThreadNetwork(br, cli)
  216. try:
  217. assert ocf.is_joined_wifi_network(br)
  218. br.write('bbr')
  219. br.expect('server16', timeout=5)
  220. cli.write('udp open')
  221. cli.expect('Done', timeout=5)
  222. ocf.wait(cli, 3)
  223. myudp = ocf.udp_parameter('INET6', '::', 5090, 'ff04::125', False, 15.0, b'')
  224. udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp, ))
  225. udp_mission.start()
  226. start_time = time.time()
  227. while not myudp.init_flag:
  228. if (time.time() - start_time) > 10:
  229. assert False
  230. assert ocf.host_joined_group('ff04::125')
  231. for num in range(0, 3):
  232. command = 'udp send ff04::125 5090 hello' + str(num)
  233. cli.write(command)
  234. cli.expect('Done', timeout=5)
  235. ocf.wait(cli, 0.5)
  236. while udp_mission.is_alive():
  237. time.sleep(1)
  238. finally:
  239. br.write('factoryreset')
  240. cli.write('factoryreset')
  241. time.sleep(3)
  242. assert b'hello' in myudp.udp_bytes
  243. # Case 5: discover dervice published by Thread device
  244. @pytest.mark.esp32s3
  245. @pytest.mark.esp32h4
  246. @pytest.mark.openthread_br
  247. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  248. @pytest.mark.parametrize(
  249. 'port, config, count, app_path, beta_target, target', [
  250. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  251. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  252. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  253. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  254. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  255. ],
  256. indirect=True,
  257. )
  258. def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  259. br = dut[2]
  260. cli = dut[1]
  261. assert Init_interface
  262. assert Init_avahi
  263. dut[0].serial.stop_redirect_thread()
  264. formBasicWiFiThreadNetwork(br, cli)
  265. try:
  266. assert ocf.is_joined_wifi_network(br)
  267. command = 'avahi-browse -rt _testyyy._udp'
  268. out_str = subprocess.getoutput(command)
  269. print('avahi-browse:\n', str(out_str))
  270. assert 'myTest' not in str(out_str)
  271. hostname = 'myTest'
  272. command = 'srp client host name ' + hostname
  273. cli.write(command)
  274. cli.expect('Done', timeout=5)
  275. cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
  276. print('cli_global_unicast_addr', cli_global_unicast_addr)
  277. command = 'srp client host address ' + str(cli_global_unicast_addr)
  278. cli.write(command)
  279. cli.expect('Done', timeout=5)
  280. port = '12348'
  281. command = 'srp client service add my-service _testyyy._udp ' + port
  282. cli.write(command)
  283. cli.expect('Done', timeout=5)
  284. cli.write('srp client autostart enable')
  285. cli.expect('Done', timeout=5)
  286. ocf.wait(cli, 3)
  287. command = 'avahi-browse -rt _testyyy._udp'
  288. out_str = subprocess.getoutput(command)
  289. print('avahi-browse:\n', str(out_str))
  290. assert 'myTest' in str(out_str)
  291. finally:
  292. br.write('factoryreset')
  293. cli.write('factoryreset')
  294. time.sleep(3)
  295. # Case 6: discover dervice published by Wi-Fi device
  296. @pytest.mark.esp32s3
  297. @pytest.mark.esp32h4
  298. @pytest.mark.openthread_br
  299. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  300. @pytest.mark.parametrize(
  301. 'port, config, count, app_path, beta_target, target', [
  302. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  303. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  304. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  305. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  306. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  307. ],
  308. indirect=True,
  309. )
  310. def test_service_discovery_of_WiFi_device(Init_interface:bool, Init_avahi:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  311. br = dut[2]
  312. cli = dut[1]
  313. assert Init_interface
  314. assert Init_avahi
  315. dut[0].serial.stop_redirect_thread()
  316. formBasicWiFiThreadNetwork(br, cli)
  317. try:
  318. assert ocf.is_joined_wifi_network(br)
  319. br_global_unicast_addr = ocf.get_global_unicast_addr(br, br)
  320. command = 'dns config ' + br_global_unicast_addr
  321. ocf.clean_buffer(cli)
  322. cli.write(command)
  323. cli.expect('Done', timeout=5)
  324. ocf.wait(cli, 1)
  325. command = 'dns resolve FA000123.default.service.arpa.'
  326. ocf.clean_buffer(cli)
  327. cli.write(command)
  328. cli.expect('Error', timeout=15)
  329. domain_name = ocf.get_domain()
  330. print('domain name is: ', domain_name)
  331. command = 'dns resolve ' + domain_name + '.default.service.arpa.'
  332. ocf.clean_buffer(cli)
  333. cli.write(command)
  334. cli.expect('TTL', timeout=10)
  335. cli.expect('Done', timeout=10)
  336. ocf.clean_buffer(cli)
  337. cli.write('dns browse _testxxx._udp.default.service.arpa')
  338. tmp = cli.expect(pexpect.TIMEOUT, timeout=5)
  339. assert 'Port:12347' not in str(tmp)
  340. ocf.host_publish_service()
  341. ocf.wait(cli, 5)
  342. ocf.clean_buffer(cli)
  343. cli.write('dns browse _testxxx._udp.default.service.arpa')
  344. tmp = cli.expect(pexpect.TIMEOUT, timeout=5)
  345. assert 'response for _testxxx' in str(tmp)
  346. assert 'Port:12347' in str(tmp)
  347. ocf.clean_buffer(cli)
  348. cli.write('dns service testxxx _testxxx._udp.default.service.arpa.')
  349. tmp = cli.expect(pexpect.TIMEOUT, timeout=5)
  350. assert 'response for testxxx' in str(tmp)
  351. assert 'Port:12347' in str(tmp)
  352. finally:
  353. ocf.host_close_service()
  354. br.write('factoryreset')
  355. cli.write('factoryreset')
  356. time.sleep(3)
  357. # Case 7: ICMP communication via NAT64
  358. @pytest.mark.esp32s3
  359. @pytest.mark.esp32h4
  360. @pytest.mark.openthread_br
  361. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  362. @pytest.mark.parametrize(
  363. 'port, config, count, app_path, beta_target, target', [
  364. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  365. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  366. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  367. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  368. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  369. ],
  370. indirect=True,
  371. )
  372. def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  373. br = dut[2]
  374. cli = dut[1]
  375. assert Init_interface
  376. dut[0].serial.stop_redirect_thread()
  377. formBasicWiFiThreadNetwork(br, cli)
  378. try:
  379. assert ocf.is_joined_wifi_network(br)
  380. host_ipv4_address = ocf.get_host_ipv4_address()
  381. print('host_ipv4_address: ', host_ipv4_address)
  382. rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), 5)[1]
  383. assert rx_nums != 0
  384. finally:
  385. br.write('factoryreset')
  386. cli.write('factoryreset')
  387. time.sleep(3)
  388. # Case 8: UDP communication via NAT64
  389. @pytest.mark.esp32s3
  390. @pytest.mark.esp32h4
  391. @pytest.mark.openthread_br
  392. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  393. @pytest.mark.parametrize(
  394. 'port, config, count, app_path, beta_target, target', [
  395. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  396. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  397. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  398. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  399. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  400. ],
  401. indirect=True,
  402. )
  403. def test_UDP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  404. br = dut[2]
  405. cli = dut[1]
  406. assert Init_interface
  407. dut[0].serial.stop_redirect_thread()
  408. formBasicWiFiThreadNetwork(br, cli)
  409. try:
  410. assert ocf.is_joined_wifi_network(br)
  411. br.write('bbr')
  412. br.expect('server16', timeout=5)
  413. cli.write('udp open')
  414. cli.expect('Done', timeout=5)
  415. ocf.wait(cli, 3)
  416. host_ipv4_address = ocf.get_host_ipv4_address()
  417. print('host_ipv4_address: ', host_ipv4_address)
  418. myudp = ocf.udp_parameter('INET4', host_ipv4_address, 5090, '', False, 15.0, b'')
  419. udp_mission = threading.Thread(target=ocf.create_host_udp_server, args=(myudp, ))
  420. udp_mission.start()
  421. start_time = time.time()
  422. while not myudp.init_flag:
  423. if (time.time() - start_time) > 10:
  424. assert False
  425. for num in range(0, 3):
  426. command = 'udp send ' + host_ipv4_address + ' 5090 hello' + str(num)
  427. cli.write(command)
  428. cli.expect('Done', timeout=5)
  429. ocf.wait(cli, 0.5)
  430. while udp_mission.is_alive():
  431. time.sleep(1)
  432. finally:
  433. br.write('factoryreset')
  434. cli.write('factoryreset')
  435. time.sleep(3)
  436. assert b'hello' in myudp.udp_bytes
  437. # Case 9: TCP communication via NAT64
  438. @pytest.mark.esp32s3
  439. @pytest.mark.esp32h4
  440. @pytest.mark.openthread_br
  441. @pytest.mark.flaky(reruns=1, reruns_delay=1)
  442. @pytest.mark.parametrize(
  443. 'port, config, count, app_path, beta_target, target', [
  444. ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3,
  445. f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
  446. f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
  447. f'|{os.path.join(os.path.dirname(__file__), "ot_br")}',
  448. 'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'),
  449. ],
  450. indirect=True,
  451. )
  452. def test_TCP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> None:
  453. br = dut[2]
  454. cli = dut[1]
  455. assert Init_interface
  456. dut[0].serial.stop_redirect_thread()
  457. formBasicWiFiThreadNetwork(br, cli)
  458. try:
  459. assert ocf.is_joined_wifi_network(br)
  460. br.write('bbr')
  461. br.expect('server16', timeout=5)
  462. cli.write('tcpsockclient open')
  463. cli.expect('Done', timeout=5)
  464. ocf.wait(cli, 3)
  465. host_ipv4_address = ocf.get_host_ipv4_address()
  466. connect_address = ocf.get_ipv6_from_ipv4(host_ipv4_address, br)
  467. print('connect_address is: ', connect_address)
  468. mytcp = ocf.tcp_parameter('INET4', host_ipv4_address, 12345, False, False, 15.0, b'')
  469. tcp_mission = threading.Thread(target=ocf.create_host_tcp_server, args=(mytcp, ))
  470. tcp_mission.start()
  471. start_time = time.time()
  472. while not mytcp.listen_flag:
  473. if (time.time() - start_time) > 10:
  474. assert False
  475. command = 'tcpsockclient connect ' + connect_address + ' 12345'
  476. cli.write(command)
  477. cli.expect('Successfully connected', timeout=10)
  478. start_time = time.time()
  479. while not mytcp.recv_flag:
  480. if (time.time() - start_time) > 10:
  481. assert False
  482. command = 'tcpsockclient send hello'
  483. cli.write(command)
  484. cli.expect('Done', timeout=5)
  485. while tcp_mission.is_alive():
  486. time.sleep(1)
  487. finally:
  488. br.write('factoryreset')
  489. cli.write('factoryreset')
  490. time.sleep(3)
  491. assert b'hello' in mytcp.tcp_bytes