iperf_test.py 30 KB


  1. """
  2. Test case for iperf example.
  3. This test case might have problem running on windows:
  4. 1. direct use of `make`
  5. 2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
  6. The test env Example_ShieldBox do need the following config::
  7. Example_ShieldBox:
  8. ap_list:
  9. - ssid: "ssid"
  10. password: "password"
  11. outlet: 1
  12. apc_ip: "192.168.1.88"
  13. attenuator_port: "/dev/ttyUSB0"
  14. iperf: "/dev/ttyUSB1"
  15. apc_ip: "192.168.1.88"
  16. pc_nic: "eth0"
  17. """
  18. from __future__ import division, unicode_literals
  19. import os
  20. import re
  21. import subprocess
  22. import time
  23. from builtins import object, range, str
  24. import ttfw_idf
  25. from idf_iperf_test_util import Attenuator, LineChart, PowerControl, TestReport
  26. from tiny_test_fw import DUT, TinyFW, Utility
  27. # configurations
  28. TEST_TIME = TEST_TIMEOUT = 60
  29. WAIT_AP_POWER_ON_TIMEOUT = 90
  30. SCAN_TIMEOUT = 3
  31. SCAN_RETRY_COUNT = 3
  32. RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
  33. ATTEN_VALUE_LIST = range(0, 60, 2)
  34. # constants
  35. FAILED_TO_SCAN_RSSI = -97
  36. INVALID_HEAP_SIZE = 0xFFFFFFFF
  37. PC_IPERF_TEMP_LOG_FILE = '.tmp_iperf.log'
  38. CONFIG_NAME_PATTERN = re.compile(r'sdkconfig\.ci\.(.+)')
  39. # We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
  40. # Using numbers for config will make this easy.
  41. # Use default value `99` for config with best performance.
  42. BEST_PERFORMANCE_CONFIG = '99'
  43. class TestResult(object):
  44. """ record, analysis test result and convert data to output format """
  45. PC_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+).0\s*-\s*(\d+).0\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits/sec')
  46. DUT_BANDWIDTH_LOG_PATTERN = re.compile(r'(\d+)-\s+(\d+)\s+sec\s+([\d.]+)\s+Mbits/sec')
  47. ZERO_POINT_THRESHOLD = -88 # RSSI, dbm
  48. ZERO_THROUGHPUT_THRESHOLD = -92 # RSSI, dbm
  49. BAD_POINT_RSSI_THRESHOLD = -75 # RSSI, dbm
  50. BAD_POINT_MIN_THRESHOLD = 10 # Mbps
  51. BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
  52. # we need at least 1/2 valid points to qualify the test result
  53. THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
  54. RSSI_RANGE = [-x for x in range(10, 100)]
  55. ATT_RANGE = [x for x in range(0, 64)]
  56. def __init__(self, proto, direction, config_name):
  57. self.proto = proto
  58. self.direction = direction
  59. self.config_name = config_name
  60. self.throughput_by_rssi = dict()
  61. self.throughput_by_att = dict()
  62. self.att_rssi_map = dict()
  63. self.heap_size = INVALID_HEAP_SIZE
  64. self.error_list = []
  65. def _save_result(self, throughput, ap_ssid, att, rssi, heap_size):
  66. """
  67. save the test results:
  68. * record the better throughput if att/rssi is the same.
  69. * record the min heap size.
  70. """
  71. if ap_ssid not in self.att_rssi_map:
  72. # for new ap, create empty dict()
  73. self.throughput_by_att[ap_ssid] = dict()
  74. self.throughput_by_rssi[ap_ssid] = dict()
  75. self.att_rssi_map[ap_ssid] = dict()
  76. self.att_rssi_map[ap_ssid][att] = rssi
  77. def record_throughput(database, key_value):
  78. try:
  79. # we save the larger value for same att
  80. if throughput > database[ap_ssid][key_value]:
  81. database[ap_ssid][key_value] = throughput
  82. except KeyError:
  83. database[ap_ssid][key_value] = throughput
  84. record_throughput(self.throughput_by_att, att)
  85. record_throughput(self.throughput_by_rssi, rssi)
  86. if int(heap_size) < self.heap_size:
  87. self.heap_size = int(heap_size)
  88. def add_result(self, raw_data, ap_ssid, att, rssi, heap_size):
  89. """
  90. add result for one test
  91. :param raw_data: iperf raw data
  92. :param ap_ssid: ap ssid that tested
  93. :param att: attenuate value
  94. :param rssi: AP RSSI
  95. :param heap_size: min heap size during test
  96. :return: throughput
  97. """
  98. fall_to_0_recorded = 0
  99. throughput_list = []
  100. result_list = self.PC_BANDWIDTH_LOG_PATTERN.findall(raw_data)
  101. if not result_list:
  102. # failed to find raw data by PC pattern, it might be DUT pattern
  103. result_list = self.DUT_BANDWIDTH_LOG_PATTERN.findall(raw_data)
  104. for result in result_list:
  105. if int(result[1]) - int(result[0]) != 1:
  106. # this could be summary, ignore this
  107. continue
  108. throughput_list.append(float(result[2]))
  109. if float(result[2]) == 0 and rssi > self.ZERO_POINT_THRESHOLD \
  110. and fall_to_0_recorded < 1:
  111. # throughput fall to 0 error. we only record 1 records for one test
  112. self.error_list.append('[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}'
  113. .format(ap_ssid, att, rssi, result[0], result[1]))
  114. fall_to_0_recorded += 1
  115. if len(throughput_list) > self.THROUGHPUT_QUALIFY_COUNT:
  116. throughput = sum(throughput_list) / len(throughput_list)
  117. else:
  118. throughput = 0.0
  119. if throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD:
  120. self.error_list.append('[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found'
  121. .format(ap_ssid, att, rssi))
  122. self._save_result(throughput, ap_ssid, att, rssi, heap_size)
  123. return throughput
  124. def post_analysis(self):
  125. """
  126. some rules need to be checked after we collected all test raw data:
  127. 1. throughput value 30% worse than the next point with lower RSSI
  128. 2. throughput value 30% worse than the next point with larger attenuate
  129. """
  130. def analysis_bad_point(data, index_type):
  131. for ap_ssid in data:
  132. result_dict = data[ap_ssid]
  133. index_list = list(result_dict.keys())
  134. index_list.sort()
  135. if index_type == 'att':
  136. index_list.reverse()
  137. for i, index_value in enumerate(index_list[1:]):
  138. if index_value < self.BAD_POINT_RSSI_THRESHOLD or \
  139. result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD:
  140. continue
  141. _percentage = result_dict[index_value] / result_dict[index_list[i]]
  142. if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD:
  143. self.error_list.append('[Error][Bad point][{}][{}: {}]: drop {:.02f}%'
  144. .format(ap_ssid, index_type, index_value,
  145. (1 - _percentage) * 100))
  146. analysis_bad_point(self.throughput_by_rssi, 'rssi')
  147. analysis_bad_point(self.throughput_by_att, 'att')
  148. def draw_throughput_figure(self, path, ap_ssid, draw_type):
  149. """
  150. :param path: folder to save figure. make sure the folder is already created.
  151. :param ap_ssid: ap ssid string or a list of ap ssid string
  152. :param draw_type: "att" or "rssi"
  153. :return: file_name
  154. """
  155. if draw_type == 'rssi':
  156. type_name = 'RSSI'
  157. data = self.throughput_by_rssi
  158. range_list = self.RSSI_RANGE
  159. elif draw_type == 'att':
  160. type_name = 'Att'
  161. data = self.throughput_by_att
  162. range_list = self.ATT_RANGE
  163. else:
  164. raise AssertionError('draw type not supported')
  165. if isinstance(ap_ssid, list):
  166. file_name = 'ThroughputVs{}_{}_{}_{}.html'.format(type_name, self.proto, self.direction,
  167. hash(ap_ssid)[:6])
  168. else:
  169. file_name = 'ThroughputVs{}_{}_{}_{}.html'.format(type_name, self.proto, self.direction, ap_ssid)
  170. LineChart.draw_line_chart(os.path.join(path, file_name),
  171. 'Throughput Vs {} ({} {})'.format(type_name, self.proto, self.direction),
  172. '{} (dbm)'.format(type_name),
  173. 'Throughput (Mbps)',
  174. data, range_list)
  175. return file_name
  176. def draw_rssi_vs_att_figure(self, path, ap_ssid):
  177. """
  178. :param path: folder to save figure. make sure the folder is already created.
  179. :param ap_ssid: ap to use
  180. :return: file_name
  181. """
  182. if isinstance(ap_ssid, list):
  183. file_name = 'AttVsRSSI_{}.html'.format(hash(ap_ssid)[:6])
  184. else:
  185. file_name = 'AttVsRSSI_{}.html'.format(ap_ssid)
  186. LineChart.draw_line_chart(os.path.join(path, file_name),
  187. 'Att Vs RSSI',
  188. 'Att (dbm)',
  189. 'RSSI (dbm)',
  190. self.att_rssi_map,
  191. self.ATT_RANGE)
  192. return file_name
  193. def get_best_throughput(self):
  194. """ get the best throughput during test """
  195. best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
  196. for ap_ssid in self.throughput_by_att]
  197. return max(best_for_aps)
  198. def __str__(self):
  199. """
  200. returns summary for this test:
  201. 1. test result (success or fail)
  202. 2. best performance for each AP
  203. 3. min free heap size during test
  204. """
  205. if self.throughput_by_att:
  206. ret = '[{}_{}][{}]: {}\r\n\r\n'.format(self.proto, self.direction, self.config_name,
  207. 'Fail' if self.error_list else 'Success')
  208. ret += 'Performance for each AP:\r\n'
  209. for ap_ssid in self.throughput_by_att:
  210. ret += '[{}]: {:.02f} Mbps\r\n'.format(ap_ssid, max(self.throughput_by_att[ap_ssid].values()))
  211. if self.heap_size != INVALID_HEAP_SIZE:
  212. ret += 'Minimum heap size: {}'.format(self.heap_size)
  213. else:
  214. ret = ''
  215. return ret
  216. class IperfTestUtility(object):
  217. """ iperf test implementation """
  218. def __init__(self, dut, config_name, ap_ssid, ap_password,
  219. pc_nic_ip, pc_iperf_log_file, test_result=None):
  220. self.config_name = config_name
  221. self.dut = dut
  222. self.pc_iperf_log_file = pc_iperf_log_file
  223. self.ap_ssid = ap_ssid
  224. self.ap_password = ap_password
  225. self.pc_nic_ip = pc_nic_ip
  226. if test_result:
  227. self.test_result = test_result
  228. else:
  229. self.test_result = {
  230. 'tcp_tx': TestResult('tcp', 'tx', config_name),
  231. 'tcp_rx': TestResult('tcp', 'rx', config_name),
  232. 'udp_tx': TestResult('udp', 'tx', config_name),
  233. 'udp_rx': TestResult('udp', 'rx', config_name),
  234. }
  235. def setup(self):
  236. """
  237. setup iperf test:
  238. 1. kill current iperf process
  239. 2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
  240. 3. scan to get AP RSSI
  241. 4. connect to AP
  242. """
  243. try:
  244. subprocess.check_output('sudo killall iperf 2>&1 > /dev/null', shell=True)
  245. except subprocess.CalledProcessError:
  246. pass
  247. self.dut.write('restart')
  248. self.dut.expect_any('iperf>', 'esp32>')
  249. self.dut.write('scan {}'.format(self.ap_ssid))
  250. for _ in range(SCAN_RETRY_COUNT):
  251. try:
  252. rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
  253. timeout=SCAN_TIMEOUT)[0])
  254. break
  255. except DUT.ExpectTimeout:
  256. continue
  257. else:
  258. raise AssertionError('Failed to scan AP')
  259. self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
  260. dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
  261. return dut_ip, rssi
  262. def _save_test_result(self, test_case, raw_data, att, rssi, heap_size):
  263. return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
  264. def _test_once(self, proto, direction):
  265. """ do measure once for one type """
  266. # connect and scan to get RSSI
  267. dut_ip, rssi = self.setup()
  268. assert direction in ['rx', 'tx']
  269. assert proto in ['tcp', 'udp']
  270. # run iperf test
  271. if direction == 'tx':
  272. with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
  273. if proto == 'tcp':
  274. process = subprocess.Popen(['iperf', '-s', '-B', self.pc_nic_ip,
  275. '-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
  276. stdout=f, stderr=f)
  277. self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
  278. else:
  279. process = subprocess.Popen(['iperf', '-s', '-u', '-B', self.pc_nic_ip,
  280. '-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
  281. stdout=f, stderr=f)
  282. self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
  283. for _ in range(TEST_TIMEOUT):
  284. if process.poll() is not None:
  285. break
  286. time.sleep(1)
  287. else:
  288. process.terminate()
  289. with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
  290. pc_raw_data = server_raw_data = f.read()
  291. else:
  292. with open(PC_IPERF_TEMP_LOG_FILE, 'w') as f:
  293. if proto == 'tcp':
  294. self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
  295. # wait until DUT TCP server created
  296. try:
  297. self.dut.expect('iperf tcp server create successfully', timeout=1)
  298. except DUT.ExpectTimeout:
  299. # compatible with old iperf example binary
  300. pass
  301. process = subprocess.Popen(['iperf', '-c', dut_ip,
  302. '-t', str(TEST_TIME), '-f', 'm'],
  303. stdout=f, stderr=f)
  304. else:
  305. self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
  306. process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', '100M',
  307. '-t', str(TEST_TIME), '-f', 'm'],
  308. stdout=f, stderr=f)
  309. for _ in range(TEST_TIMEOUT):
  310. if process.poll() is not None:
  311. break
  312. time.sleep(1)
  313. else:
  314. process.terminate()
  315. server_raw_data = self.dut.read()
  316. with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
  317. pc_raw_data = f.read()
  318. # save PC iperf logs to console
  319. with open(self.pc_iperf_log_file, 'a+') as f:
  320. f.write('## [{}] `{}`\r\n##### {}'
  321. .format(self.config_name,
  322. '{}_{}'.format(proto, direction),
  323. time.strftime('%m-%d %H:%M:%S', time.localtime(time.time()))))
  324. f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
  325. self.dut.write('heap')
  326. heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
  327. # return server raw data (for parsing test results) and RSSI
  328. return server_raw_data, rssi, heap_size
  329. def run_test(self, proto, direction, atten_val):
  330. """
  331. run test for one type, with specified atten_value and save the test result
  332. :param proto: tcp or udp
  333. :param direction: tx or rx
  334. :param atten_val: attenuate value
  335. """
  336. rssi = FAILED_TO_SCAN_RSSI
  337. heap_size = INVALID_HEAP_SIZE
  338. try:
  339. server_raw_data, rssi, heap_size = self._test_once(proto, direction)
  340. throughput = self._save_test_result('{}_{}'.format(proto, direction),
  341. server_raw_data, atten_val,
  342. rssi, heap_size)
  343. Utility.console_log('[{}][{}_{}][{}][{}]: {:.02f}'
  344. .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
  345. except Exception as e:
  346. self._save_test_result('{}_{}'.format(proto, direction), '', atten_val, rssi, heap_size)
  347. Utility.console_log('Failed during test: {}'.format(e))
  348. def run_all_cases(self, atten_val):
  349. """
  350. run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
  351. :param atten_val: attenuate value
  352. """
  353. self.run_test('tcp', 'tx', atten_val)
  354. self.run_test('tcp', 'rx', atten_val)
  355. self.run_test('udp', 'tx', atten_val)
  356. self.run_test('udp', 'rx', atten_val)
  357. def wait_ap_power_on(self):
  358. """
  359. AP need to take sometime to power on. It changes for different APs.
  360. This method will scan to check if the AP powers on.
  361. :return: True or False
  362. """
  363. self.dut.write('restart')
  364. self.dut.expect_any('iperf>', 'esp32>')
  365. for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
  366. try:
  367. self.dut.write('scan {}'.format(self.ap_ssid))
  368. self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
  369. timeout=SCAN_TIMEOUT)
  370. ret = True
  371. break
  372. except DUT.ExpectTimeout:
  373. pass
  374. else:
  375. ret = False
  376. return ret
  377. class IperfTestUtilitySoftap(IperfTestUtility):
  378. """ iperf test implementation """
  379. def __init__(self, dut, softap_dut, config_name, test_result=None):
  380. super(IperfTestUtility, self).__init__(dut, config_name, 'softap', '1234567890', None, None, test_result=None)
  381. self.softap_dut = softap_dut
  382. self.softap_ip = '192.168.4.1'
  383. def setup(self):
  384. """
  385. setup iperf test:
  386. 1. kill current iperf process
  387. 2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
  388. 3. scan to get AP RSSI
  389. 4. connect to AP
  390. """
  391. self.softap_dut.write('restart')
  392. self.softap_dut.expect_any('iperf>', 'esp32>', timeout=30)
  393. self.softap_dut.write('ap {} {}'.format(self.ap_ssid, self.ap_password))
  394. self.dut.write('restart')
  395. self.dut.expect_any('iperf>', 'esp32>', timeout=30)
  396. self.dut.write('scan {}'.format(self.ap_ssid))
  397. for _ in range(SCAN_RETRY_COUNT):
  398. try:
  399. rssi = int(self.dut.expect(re.compile(r'\[{}]\[rssi=(-\d+)]'.format(self.ap_ssid)),
  400. timeout=SCAN_TIMEOUT)[0])
  401. break
  402. except DUT.ExpectTimeout:
  403. continue
  404. else:
  405. raise AssertionError('Failed to scan AP')
  406. self.dut.write('sta {} {}'.format(self.ap_ssid, self.ap_password))
  407. dut_ip = self.dut.expect(re.compile(r'sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)'))[0]
  408. return dut_ip, rssi
  409. def _test_once(self, proto, direction):
  410. """ do measure once for one type """
  411. # connect and scan to get RSSI
  412. dut_ip, rssi = self.setup()
  413. assert direction in ['rx', 'tx']
  414. assert proto in ['tcp', 'udp']
  415. # run iperf test
  416. if direction == 'tx':
  417. if proto == 'tcp':
  418. self.softap_dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
  419. # wait until DUT TCP server created
  420. try:
  421. self.softap_dut.expect('iperf tcp server create successfully', timeout=1)
  422. except DUT.ExpectTimeout:
  423. # compatible with old iperf example binary
  424. pass
  425. self.dut.write('iperf -c {} -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
  426. else:
  427. self.softap_dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
  428. self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.softap_ip, TEST_TIME))
  429. else:
  430. if proto == 'tcp':
  431. self.dut.write('iperf -s -i 1 -t {}'.format(TEST_TIME))
  432. # wait until DUT TCP server created
  433. try:
  434. self.dut.expect('iperf tcp server create successfully', timeout=1)
  435. except DUT.ExpectTimeout:
  436. # compatible with old iperf example binary
  437. pass
  438. self.softap_dut.write('iperf -c {} -i 1 -t {}'.format(dut_ip, TEST_TIME))
  439. else:
  440. self.dut.write('iperf -s -u -i 1 -t {}'.format(TEST_TIME))
  441. self.softap_dut.write('iperf -c {} -u -i 1 -t {}'.format(dut_ip, TEST_TIME))
  442. time.sleep(60)
  443. if direction == 'tx':
  444. server_raw_data = self.dut.read()
  445. else:
  446. server_raw_data = self.softap_dut.read()
  447. self.dut.write('iperf -a')
  448. self.softap_dut.write('iperf -a')
  449. self.dut.write('heap')
  450. heap_size = self.dut.expect(re.compile(r'min heap size: (\d+)\D'))[0]
  451. # return server raw data (for parsing test results) and RSSI
  452. return server_raw_data, rssi, heap_size
  453. @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic', category='stress')
  454. def test_wifi_throughput_with_different_configs(env, extra_data):
  455. """
  456. steps: |
  457. 1. build iperf with specified configs
  458. 2. test throughput for all routers
  459. """
  460. pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
  461. pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
  462. ap_info = {
  463. 'ssid': env.get_variable('ap_ssid'),
  464. 'password': env.get_variable('ap_password'),
  465. }
  466. config_names_raw = subprocess.check_output(['ls', os.path.dirname(os.path.abspath(__file__))])
  467. config_names = CONFIG_NAME_PATTERN.findall(config_names_raw)
  468. if not config_names:
  469. raise ValueError('no configs found in {}'.format(os.path.dirname(__file__)))
  470. test_result = dict()
  471. sdkconfig_files = dict()
  472. for config_name in config_names:
  473. # 1. get the config
  474. sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__),
  475. 'sdkconfig.ci.{}'.format(config_name))
  476. # 2. get DUT and download
  477. dut = env.get_dut('iperf', 'examples/wifi/iperf', dut_class=ttfw_idf.ESP32DUT,
  478. app_config_name=config_name)
  479. dut.start_app()
  480. dut.expect_any('iperf>', 'esp32>')
  481. # 3. run test for each required att value
  482. test_result[config_name] = {
  483. 'tcp_tx': TestResult('tcp', 'tx', config_name),
  484. 'tcp_rx': TestResult('tcp', 'rx', config_name),
  485. 'udp_tx': TestResult('udp', 'tx', config_name),
  486. 'udp_rx': TestResult('udp', 'rx', config_name),
  487. }
  488. test_utility = IperfTestUtility(dut, config_name, ap_info['ssid'],
  489. ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result[config_name])
  490. for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
  491. test_utility.run_all_cases(0)
  492. for result_type in test_result[config_name]:
  493. summary = str(test_result[config_name][result_type])
  494. if summary:
  495. Utility.console_log(summary, color='orange')
  496. # 4. check test results
  497. env.close_dut('iperf')
  498. # 5. generate report
  499. report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, 'ThroughputForConfigsReport'),
  500. ap_info['ssid'], test_result, sdkconfig_files)
  501. report.generate_report()
  502. @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox', category='stress')
  503. def test_wifi_throughput_vs_rssi(env, extra_data):
  504. """
  505. steps: |
  506. 1. build with best performance config
  507. 2. switch on one router
  508. 3. set attenuator value from 0-60 for each router
  509. 4. test TCP tx rx and UDP tx rx throughput
  510. """
  511. att_port = env.get_variable('attenuator_port')
  512. ap_list = env.get_variable('ap_list')
  513. pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
  514. apc_ip = env.get_variable('apc_ip')
  515. pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
  516. test_result = {
  517. 'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
  518. 'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
  519. 'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
  520. 'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
  521. }
  522. # 1. get DUT and download
  523. dut = env.get_dut('iperf', 'examples/wifi/iperf', dut_class=ttfw_idf.ESP32DUT,
  524. app_config_name=BEST_PERFORMANCE_CONFIG)
  525. dut.start_app()
  526. dut.expect_any('iperf>', 'esp32>')
  527. # 2. run test for each required att value
  528. for ap_info in ap_list:
  529. test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'], ap_info['password'],
  530. pc_nic_ip, pc_iperf_log_file, test_result)
  531. PowerControl.Control.control_rest(apc_ip, ap_info['outlet'], 'OFF')
  532. PowerControl.Control.control(apc_ip, {ap_info['outlet']: 'ON'})
  533. Attenuator.set_att(att_port, 0)
  534. if not test_utility.wait_ap_power_on():
  535. Utility.console_log('[{}] failed to power on, skip testing this AP'
  536. .format(ap_info['ssid']), color='red')
  537. continue
  538. for atten_val in ATTEN_VALUE_LIST:
  539. assert Attenuator.set_att(att_port, atten_val) is True
  540. test_utility.run_all_cases(atten_val)
  541. # 3. check test results
  542. env.close_dut('iperf')
  543. # 4. generate report
  544. report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'STAThroughputVsRssiReport'),
  545. test_result)
  546. report.generate_report()
  547. @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox_Basic')
  548. def test_wifi_throughput_basic(env, extra_data):
  549. """
  550. steps: |
  551. 1. test TCP tx rx and UDP tx rx throughput
  552. 2. compare with the pre-defined pass standard
  553. """
  554. pc_nic_ip = env.get_pc_nic_info('pc_nic', 'ipv4')['addr']
  555. pc_iperf_log_file = os.path.join(env.log_path, 'pc_iperf_log.md')
  556. ap_info = {
  557. 'ssid': env.get_variable('ap_ssid'),
  558. 'password': env.get_variable('ap_password'),
  559. }
  560. # 1. get DUT
  561. dut = env.get_dut('iperf', 'examples/wifi/iperf', dut_class=ttfw_idf.ESP32DUT,
  562. app_config_name=BEST_PERFORMANCE_CONFIG)
  563. dut.start_app()
  564. dut.expect_any('iperf>', 'esp32>')
  565. # 2. preparing
  566. test_result = {
  567. 'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
  568. 'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
  569. 'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
  570. 'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
  571. }
  572. test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info['ssid'],
  573. ap_info['password'], pc_nic_ip, pc_iperf_log_file, test_result)
  574. # 3. run test for TCP Tx, Rx and UDP Tx, Rx
  575. for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
  576. test_utility.run_all_cases(0)
  577. # 4. log performance and compare with pass standard
  578. performance_items = []
  579. for throughput_type in test_result:
  580. ttfw_idf.log_performance('{}_throughput'.format(throughput_type),
  581. '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput()))
  582. performance_items.append(['{}_throughput'.format(throughput_type),
  583. '{:.02f} Mbps'.format(test_result[throughput_type].get_best_throughput())])
  584. # 5. save to report
  585. TinyFW.JunitReport.update_performance(performance_items)
  586. # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
  587. for throughput_type in test_result:
  588. ttfw_idf.check_performance('{}_throughput'.format(throughput_type),
  589. test_result[throughput_type].get_best_throughput(), dut.TARGET)
  590. env.close_dut('iperf')
  591. @ttfw_idf.idf_example_test(env_tag='Example_ShieldBox2', category='stress')
  592. def test_softap_throughput_vs_rssi(env, extra_data):
  593. """
  594. steps: |
  595. 1. build with best performance config
  596. 2. switch on one router
  597. 3. set attenuator value from 0-60 for each router
  598. 4. test TCP tx rx and UDP tx rx throughput
  599. """
  600. att_port = env.get_variable('attenuator_port')
  601. test_result = {
  602. 'tcp_tx': TestResult('tcp', 'tx', BEST_PERFORMANCE_CONFIG),
  603. 'tcp_rx': TestResult('tcp', 'rx', BEST_PERFORMANCE_CONFIG),
  604. 'udp_tx': TestResult('udp', 'tx', BEST_PERFORMANCE_CONFIG),
  605. 'udp_rx': TestResult('udp', 'rx', BEST_PERFORMANCE_CONFIG),
  606. }
  607. # 1. get DUT and download
  608. softap_dut = env.get_dut('softap_iperf', 'examples/wifi/iperf', dut_class=ttfw_idf.ESP32DUT,
  609. app_config_name=BEST_PERFORMANCE_CONFIG)
  610. softap_dut.start_app()
  611. softap_dut.expect_any('iperf>', 'esp32>')
  612. sta_dut = env.get_dut('sta_iperf', 'examples/wifi/iperf', dut_class=ttfw_idf.ESP32DUT,
  613. app_config_name=BEST_PERFORMANCE_CONFIG)
  614. sta_dut.start_app()
  615. sta_dut.expect_any('iperf>', 'esp32>')
  616. # 2. run test for each required att value
  617. test_utility = IperfTestUtilitySoftap(sta_dut, softap_dut, BEST_PERFORMANCE_CONFIG, test_result)
  618. Attenuator.set_att(att_port, 0)
  619. for atten_val in ATTEN_VALUE_LIST:
  620. assert Attenuator.set_att(att_port, atten_val) is True
  621. test_utility.run_all_cases(atten_val)
  622. env.close_dut('softap_iperf')
  623. env.close_dut('sta_iperf')
  624. # 3. generate report
  625. report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, 'SoftAPThroughputVsRssiReport'),
  626. test_result)
  627. report.generate_report()
  628. if __name__ == '__main__':
  629. test_wifi_throughput_basic(env_config_file='EnvConfig.yml')
  630. test_wifi_throughput_with_different_configs(env_config_file='EnvConfig.yml')
  631. test_wifi_throughput_vs_rssi(env_config_file='EnvConfig.yml')
  632. test_softap_throughput_vs_rssi(env_config_file='EnvConfig.yml')