iperf_test.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. """
  2. Test case for iperf example.
  3. This test case might have problem running on windows:
  4. 1. direct use of `make`
  5. 2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
  6. The test env Example_ShieldBox do need the following config::
  7. Example_ShieldBox:
  8. ap_list:
  9. - ssid: "ssid"
  10. password: "password"
  11. outlet: 1
  12. apc_ip: "192.168.1.88"
  13. attenuator_port: "/dev/ttyUSB0"
  14. iperf: "/dev/ttyUSB1"
  15. apc_ip: "192.168.1.88"
  16. pc_nic: "eth0"
  17. """
  18. from __future__ import division
  19. from __future__ import unicode_literals
  20. from builtins import str
  21. from builtins import range
  22. from builtins import object
  23. import re
  24. import os
  25. import time
  26. import subprocess
  27. from tiny_test_fw import TinyFW, DUT, Utility
  28. import ttfw_idf
  29. from idf_iperf_test_util import (Attenuator, PowerControl, LineChart, TestReport)
  30. # configurations
  31. TEST_TIME = TEST_TIMEOUT = 60
  32. WAIT_AP_POWER_ON_TIMEOUT = 90
  33. SCAN_TIMEOUT = 3
  34. SCAN_RETRY_COUNT = 3
  35. RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
  36. ATTEN_VALUE_LIST = range(0, 60, 2)
  37. # constants
  38. FAILED_TO_SCAN_RSSI = -97
  39. INVALID_HEAP_SIZE = 0xFFFFFFFF
  40. PC_IPERF_TEMP_LOG_FILE = ".tmp_iperf.log"
  41. CONFIG_NAME_PATTERN = re.compile(r"sdkconfig\.defaults\.(.+)")
  42. # We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
  43. # Using numbers for config will make this easy.
  44. # Use default value `99` for config with best performance.
  45. BEST_PERFORMANCE_CONFIG = "99"
  46. class TestResult(object):
  47. """ record, analysis test result and convert data to output format """
  48. PC_BANDWIDTH_LOG_PATTERN = re.compile(r"(\d+).0\s*-\s*(\d+).0\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits/sec")
  49. DUT_BANDWIDTH_LOG_PATTERN = re.compile(r"(\d+)-\s+(\d+)\s+sec\s+([\d.]+)\s+Mbits/sec")
  50. ZERO_POINT_THRESHOLD = -88 # RSSI, dbm
  51. ZERO_THROUGHPUT_THRESHOLD = -92 # RSSI, dbm
  52. BAD_POINT_RSSI_THRESHOLD = -85 # RSSI, dbm
  53. BAD_POINT_MIN_THRESHOLD = 3 # Mbps
  54. BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
  55. # we need at least 1/2 valid points to qualify the test result
  56. THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
  57. def __init__(self, proto, direction, config_name):
  58. self.proto = proto
  59. self.direction = direction
  60. self.config_name = config_name
  61. self.throughput_by_rssi = dict()
  62. self.throughput_by_att = dict()
  63. self.att_rssi_map = dict()
  64. self.heap_size = INVALID_HEAP_SIZE
  65. self.error_list = []
  66. def _save_result(self, throughput, ap_ssid, att, rssi, heap_size):
  67. """
  68. save the test results:
  69. * record the better throughput if att/rssi is the same.
  70. * record the min heap size.
  71. """
  72. if ap_ssid not in self.att_rssi_map:
  73. # for new ap, create empty dict()
  74. self.throughput_by_att[ap_ssid] = dict()
  75. self.throughput_by_rssi[ap_ssid] = dict()
  76. self.att_rssi_map[ap_ssid] = dict()
  77. self.att_rssi_map[ap_ssid][att] = rssi
  78. def record_throughput(database, key_value):
  79. try:
  80. # we save the larger value for same att
  81. if throughput > database[ap_ssid][key_value]:
  82. database[ap_ssid][key_value] = throughput
  83. except KeyError:
  84. database[ap_ssid][key_value] = throughput
  85. record_throughput(self.throughput_by_att, att)
  86. record_throughput(self.throughput_by_rssi, rssi)
  87. if int(heap_size) < self.heap_size:
  88. self.heap_size = int(heap_size)
  89. def add_result(self, raw_data, ap_ssid, att, rssi, heap_size):
  90. """
  91. add result for one test
  92. :param raw_data: iperf raw data
  93. :param ap_ssid: ap ssid that tested
  94. :param att: attenuate value
  95. :param rssi: AP RSSI
  96. :param heap_size: min heap size during test
  97. :return: throughput
  98. """
  99. fall_to_0_recorded = 0
  100. throughput_list = []
  101. result_list = self.PC_BANDWIDTH_LOG_PATTERN.findall(raw_data)
  102. if not result_list:
  103. # failed to find raw data by PC pattern, it might be DUT pattern
  104. result_list = self.DUT_BANDWIDTH_LOG_PATTERN.findall(raw_data)
  105. for result in result_list:
  106. if int(result[1]) - int(result[0]) != 1:
  107. # this could be summary, ignore this
  108. continue
  109. throughput_list.append(float(result[2]))
  110. if float(result[2]) == 0 and rssi > self.ZERO_POINT_THRESHOLD \
  111. and fall_to_0_recorded < 1:
  112. # throughput fall to 0 error. we only record 1 records for one test
  113. self.error_list.append("[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}"
  114. .format(ap_ssid, att, rssi, result[0], result[1]))
  115. fall_to_0_recorded += 1
  116. if len(throughput_list) > self.THROUGHPUT_QUALIFY_COUNT:
  117. throughput = sum(throughput_list) / len(throughput_list)
  118. else:
  119. throughput = 0.0
  120. if throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD:
  121. self.error_list.append("[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found"
  122. .format(ap_ssid, att, rssi))
  123. self._save_result(throughput, ap_ssid, att, rssi, heap_size)
  124. return throughput
  125. def post_analysis(self):
  126. """
  127. some rules need to be checked after we collected all test raw data:
  128. 1. throughput value 30% worse than the next point with lower RSSI
  129. 2. throughput value 30% worse than the next point with larger attenuate
  130. """
  131. def analysis_bad_point(data, index_type):
  132. for ap_ssid in data:
  133. result_dict = data[ap_ssid]
  134. index_list = list(result_dict.keys())
  135. index_list.sort()
  136. if index_type == "att":
  137. index_list.reverse()
  138. for i, index_value in enumerate(index_list[1:]):
  139. if index_value < self.BAD_POINT_RSSI_THRESHOLD or \
  140. result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD:
  141. continue
  142. _percentage = result_dict[index_value] / result_dict[index_list[i]]
  143. if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD:
  144. self.error_list.append("[Error][Bad point][{}][{}: {}]: drop {:.02f}%"
  145. .format(ap_ssid, index_type, index_value,
  146. (1 - _percentage) * 100))
  147. analysis_bad_point(self.throughput_by_rssi, "rssi")
  148. analysis_bad_point(self.throughput_by_att, "att")
  149. @staticmethod
  150. def _convert_to_draw_format(data, label):
  151. keys = data.keys()
  152. keys.sort()
  153. return {
  154. "x-axis": keys,
  155. "y-axis": [data[x] for x in keys],
  156. "label": label,
  157. }
  158. def draw_throughput_figure(self, path, ap_ssid, draw_type):
  159. """
  160. :param path: folder to save figure. make sure the folder is already created.
  161. :param ap_ssid: ap ssid string or a list of ap ssid string
  162. :param draw_type: "att" or "rssi"
  163. :return: file_name
  164. """
  165. if draw_type == "rssi":
  166. type_name = "RSSI"
  167. data = self.throughput_by_rssi
  168. elif draw_type == "att":
  169. type_name = "Att"
  170. data = self.throughput_by_att
  171. else:
  172. raise AssertionError("draw type not supported")
  173. if isinstance(ap_ssid, list):
  174. file_name = "ThroughputVs{}_{}_{}_{}.png".format(type_name, self.proto, self.direction,
  175. hash(ap_ssid)[:6])
  176. data_list = [self._convert_to_draw_format(data[_ap_ssid], _ap_ssid)
  177. for _ap_ssid in ap_ssid]
  178. else:
  179. file_name = "ThroughputVs{}_{}_{}_{}.png".format(type_name, self.proto, self.direction, ap_ssid)
  180. data_list = [self._convert_to_draw_format(data[ap_ssid], ap_ssid)]
  181. LineChart.draw_line_chart(os.path.join(path, file_name),
  182. "Throughput Vs {} ({} {})".format(type_name, self.proto, self.direction),
  183. "Throughput (Mbps)",
  184. "{} (dbm)".format(type_name),
  185. data_list)
  186. return file_name
  187. def draw_rssi_vs_att_figure(self, path, ap_ssid):
  188. """
  189. :param path: folder to save figure. make sure the folder is already created.
  190. :param ap_ssid: ap to use
  191. :return: file_name
  192. """
  193. if isinstance(ap_ssid, list):
  194. file_name = "AttVsRSSI_{}.png".format(hash(ap_ssid)[:6])
  195. data_list = [self._convert_to_draw_format(self.att_rssi_map[_ap_ssid], _ap_ssid)
  196. for _ap_ssid in ap_ssid]
  197. else:
  198. file_name = "AttVsRSSI_{}.png".format(ap_ssid)
  199. data_list = [self._convert_to_draw_format(self.att_rssi_map[ap_ssid], ap_ssid)]
  200. LineChart.draw_line_chart(os.path.join(path, file_name),
  201. "Att Vs RSSI",
  202. "Att (dbm)",
  203. "RSSI (dbm)",
  204. data_list)
  205. return file_name
  206. def get_best_throughput(self):
  207. """ get the best throughput during test """
  208. best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
  209. for ap_ssid in self.throughput_by_att]
  210. return max(best_for_aps)
  211. def __str__(self):
  212. """
  213. returns summary for this test:
  214. 1. test result (success or fail)
  215. 2. best performance for each AP
  216. 3. min free heap size during test
  217. """
  218. if self.throughput_by_att:
  219. ret = "[{}_{}][{}]: {}\r\n\r\n".format(self.proto, self.direction, self.config_name,
  220. "Fail" if self.error_list else "Success")
  221. ret += "Performance for each AP:\r\n"
  222. for ap_ssid in self.throughput_by_att:
  223. ret += "[{}]: {:.02f} Mbps\r\n".format(ap_ssid, max(self.throughput_by_att[ap_ssid].values()))
  224. if self.heap_size != INVALID_HEAP_SIZE:
  225. ret += "Minimum heap size: {}".format(self.heap_size)
  226. else:
  227. ret = ""
  228. return ret
  229. class IperfTestUtility(object):
  230. """ iperf test implementation """
  231. def __init__(self, dut, config_name, ap_ssid, ap_password,
  232. pc_nic_ip, pc_iperf_log_file, test_result=None):
  233. self.config_name = config_name
  234. self.dut = dut
  235. self.pc_iperf_log_file = pc_iperf_log_file
  236. self.ap_ssid = ap_ssid
  237. self.ap_password = ap_password
  238. self.pc_nic_ip = pc_nic_ip
  239. if test_result:
  240. self.test_result = test_result
  241. else:
  242. self.test_result = {
  243. "tcp_tx": TestResult("tcp", "tx", config_name),
  244. "tcp_rx": TestResult("tcp", "rx", config_name),
  245. "udp_tx": TestResult("udp", "tx", config_name),
  246. "udp_rx": TestResult("udp", "rx", config_name),
  247. }
  248. def setup(self):
  249. """
  250. setup iperf test:
  251. 1. kill current iperf process
  252. 2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
  253. 3. scan to get AP RSSI
  254. 4. connect to AP
  255. """
  256. try:
  257. subprocess.check_output("sudo killall iperf 2>&1 > /dev/null", shell=True)
  258. except subprocess.CalledProcessError:
  259. pass
  260. self.dut.write("restart")
  261. self.dut.expect("esp32>")
  262. self.dut.write("scan {}".format(self.ap_ssid))
  263. for _ in range(SCAN_RETRY_COUNT):
  264. try:
  265. rssi = int(self.dut.expect(re.compile(r"\[{}]\[rssi=(-\d+)]".format(self.ap_ssid)),
  266. timeout=SCAN_TIMEOUT)[0])
  267. break
  268. except DUT.ExpectTimeout:
  269. continue
  270. else:
  271. raise AssertionError("Failed to scan AP")
  272. self.dut.write("sta {} {}".format(self.ap_ssid, self.ap_password))
  273. dut_ip = self.dut.expect(re.compile(r"sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)"))[0]
  274. return dut_ip, rssi
  275. def _save_test_result(self, test_case, raw_data, att, rssi, heap_size):
  276. return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
  277. def _test_once(self, proto, direction):
  278. """ do measure once for one type """
  279. # connect and scan to get RSSI
  280. dut_ip, rssi = self.setup()
  281. assert direction in ["rx", "tx"]
  282. assert proto in ["tcp", "udp"]
  283. # run iperf test
  284. if direction == "tx":
  285. with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
  286. if proto == "tcp":
  287. process = subprocess.Popen(["iperf", "-s", "-B", self.pc_nic_ip,
  288. "-t", str(TEST_TIME), "-i", "1", "-f", "m"],
  289. stdout=f, stderr=f)
  290. self.dut.write("iperf -c {} -i 1 -t {}".format(self.pc_nic_ip, TEST_TIME))
  291. else:
  292. process = subprocess.Popen(["iperf", "-s", "-u", "-B", self.pc_nic_ip,
  293. "-t", str(TEST_TIME), "-i", "1", "-f", "m"],
  294. stdout=f, stderr=f)
  295. self.dut.write("iperf -c {} -u -i 1 -t {}".format(self.pc_nic_ip, TEST_TIME))
  296. for _ in range(TEST_TIMEOUT):
  297. if process.poll() is not None:
  298. break
  299. time.sleep(1)
  300. else:
  301. process.terminate()
  302. with open(PC_IPERF_TEMP_LOG_FILE, "r") as f:
  303. pc_raw_data = server_raw_data = f.read()
  304. else:
  305. with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
  306. if proto == "tcp":
  307. self.dut.write("iperf -s -i 1 -t {}".format(TEST_TIME))
  308. process = subprocess.Popen(["iperf", "-c", dut_ip,
  309. "-t", str(TEST_TIME), "-f", "m"],
  310. stdout=f, stderr=f)
  311. else:
  312. self.dut.write("iperf -s -u -i 1 -t {}".format(TEST_TIME))
  313. process = subprocess.Popen(["iperf", "-c", dut_ip, "-u", "-b", "100M",
  314. "-t", str(TEST_TIME), "-f", "m"],
  315. stdout=f, stderr=f)
  316. for _ in range(TEST_TIMEOUT):
  317. if process.poll() is not None:
  318. break
  319. time.sleep(1)
  320. else:
  321. process.terminate()
  322. server_raw_data = self.dut.read()
  323. with open(PC_IPERF_TEMP_LOG_FILE, "r") as f:
  324. pc_raw_data = f.read()
  325. # save PC iperf logs to console
  326. with open(self.pc_iperf_log_file, "a+") as f:
  327. f.write("## [{}] `{}`\r\n##### {}"
  328. .format(self.config_name,
  329. "{}_{}".format(proto, direction),
  330. time.strftime("%m-%d %H:%M:%S", time.localtime(time.time()))))
  331. f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
  332. self.dut.write("heap")
  333. heap_size = self.dut.expect(re.compile(r"min heap size: (\d+)\D"))[0]
  334. # return server raw data (for parsing test results) and RSSI
  335. return server_raw_data, rssi, heap_size
  336. def run_test(self, proto, direction, atten_val):
  337. """
  338. run test for one type, with specified atten_value and save the test result
  339. :param proto: tcp or udp
  340. :param direction: tx or rx
  341. :param atten_val: attenuate value
  342. """
  343. rssi = FAILED_TO_SCAN_RSSI
  344. heap_size = INVALID_HEAP_SIZE
  345. try:
  346. server_raw_data, rssi, heap_size = self._test_once(proto, direction)
  347. throughput = self._save_test_result("{}_{}".format(proto, direction),
  348. server_raw_data, atten_val,
  349. rssi, heap_size)
  350. Utility.console_log("[{}][{}_{}][{}][{}]: {:.02f}"
  351. .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
  352. except Exception as e:
  353. self._save_test_result("{}_{}".format(proto, direction), "", atten_val, rssi, heap_size)
  354. Utility.console_log("Failed during test: {}".format(e))
  355. def run_all_cases(self, atten_val):
  356. """
  357. run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
  358. :param atten_val: attenuate value
  359. """
  360. self.run_test("tcp", "tx", atten_val)
  361. self.run_test("tcp", "rx", atten_val)
  362. self.run_test("udp", "tx", atten_val)
  363. self.run_test("udp", "rx", atten_val)
  364. def wait_ap_power_on(self):
  365. """
  366. AP need to take sometime to power on. It changes for different APs.
  367. This method will scan to check if the AP powers on.
  368. :return: True or False
  369. """
  370. self.dut.write("restart")
  371. self.dut.expect("esp32>")
  372. for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
  373. try:
  374. self.dut.write("scan {}".format(self.ap_ssid))
  375. self.dut.expect(re.compile(r"\[{}]\[rssi=(-\d+)]".format(self.ap_ssid)),
  376. timeout=SCAN_TIMEOUT)
  377. ret = True
  378. break
  379. except DUT.ExpectTimeout:
  380. pass
  381. else:
  382. ret = False
  383. return ret
  384. @ttfw_idf.idf_example_test(env_tag="Example_ShieldBox_Basic", category="stress")
  385. def test_wifi_throughput_with_different_configs(env, extra_data):
  386. """
  387. steps: |
  388. 1. build iperf with specified configs
  389. 2. test throughput for all routers
  390. """
  391. pc_nic_ip = env.get_pc_nic_info("pc_nic", "ipv4")["addr"]
  392. pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
  393. ap_info = {
  394. "ssid": env.get_variable("ap_ssid"),
  395. "password": env.get_variable("ap_password"),
  396. }
  397. config_names_raw = subprocess.check_output(["ls", os.path.dirname(os.path.abspath(__file__))])
  398. test_result = dict()
  399. sdkconfig_files = dict()
  400. for config_name in CONFIG_NAME_PATTERN.findall(config_names_raw):
  401. # 1. get the config
  402. sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__),
  403. "sdkconfig.ci.{}".format(config_name))
  404. # 2. get DUT and download
  405. dut = env.get_dut("iperf", "examples/wifi/iperf", app_config_name=config_name)
  406. dut.start_app()
  407. dut.expect("esp32>")
  408. # 3. run test for each required att value
  409. test_result[config_name] = {
  410. "tcp_tx": TestResult("tcp", "tx", config_name),
  411. "tcp_rx": TestResult("tcp", "rx", config_name),
  412. "udp_tx": TestResult("udp", "tx", config_name),
  413. "udp_rx": TestResult("udp", "rx", config_name),
  414. }
  415. test_utility = IperfTestUtility(dut, config_name, ap_info["ssid"],
  416. ap_info["password"], pc_nic_ip, pc_iperf_log_file, test_result[config_name])
  417. for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
  418. test_utility.run_all_cases(0)
  419. for result_type in test_result[config_name]:
  420. summary = str(test_result[config_name][result_type])
  421. if summary:
  422. Utility.console_log(summary, color="orange")
  423. # 4. check test results
  424. env.close_dut("iperf")
  425. # 5. generate report
  426. report = TestReport.ThroughputForConfigsReport(os.path.join(env.log_path, "ThroughputForConfigsReport"),
  427. ap_info["ssid"], test_result, sdkconfig_files)
  428. report.generate_report()
  429. @ttfw_idf.idf_example_test(env_tag="Example_ShieldBox", category="stress")
  430. def test_wifi_throughput_vs_rssi(env, extra_data):
  431. """
  432. steps: |
  433. 1. build with best performance config
  434. 2. switch on one router
  435. 3. set attenuator value from 0-60 for each router
  436. 4. test TCP tx rx and UDP tx rx throughput
  437. """
  438. att_port = env.get_variable("attenuator_port")
  439. ap_list = env.get_variable("ap_list")
  440. pc_nic_ip = env.get_pc_nic_info("pc_nic", "ipv4")["addr"]
  441. apc_ip = env.get_variable("apc_ip")
  442. pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
  443. test_result = {
  444. "tcp_tx": TestResult("tcp", "tx", BEST_PERFORMANCE_CONFIG),
  445. "tcp_rx": TestResult("tcp", "rx", BEST_PERFORMANCE_CONFIG),
  446. "udp_tx": TestResult("udp", "tx", BEST_PERFORMANCE_CONFIG),
  447. "udp_rx": TestResult("udp", "rx", BEST_PERFORMANCE_CONFIG),
  448. }
  449. # 1. get DUT and download
  450. dut = env.get_dut("iperf", "examples/wifi/iperf", app_config_name=BEST_PERFORMANCE_CONFIG)
  451. dut.start_app()
  452. dut.expect("esp32>")
  453. # 2. run test for each required att value
  454. for ap_info in ap_list:
  455. test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info["ssid"], ap_info["password"],
  456. pc_nic_ip, pc_iperf_log_file, test_result)
  457. PowerControl.Control.control_rest(apc_ip, ap_info["outlet"], "OFF")
  458. PowerControl.Control.control(apc_ip, {ap_info["outlet"]: "ON"})
  459. Attenuator.set_att(att_port, 0)
  460. if not test_utility.wait_ap_power_on():
  461. Utility.console_log("[{}] failed to power on, skip testing this AP"
  462. .format(ap_info["ssid"]), color="red")
  463. continue
  464. for atten_val in ATTEN_VALUE_LIST:
  465. assert Attenuator.set_att(att_port, atten_val) is True
  466. test_utility.run_all_cases(atten_val)
  467. # 3. check test results
  468. env.close_dut("iperf")
  469. # 4. generate report
  470. report = TestReport.ThroughputVsRssiReport(os.path.join(env.log_path, "ThroughputVsRssiReport"),
  471. test_result)
  472. report.generate_report()
  473. @ttfw_idf.idf_example_test(env_tag="Example_ShieldBox_Basic")
  474. def test_wifi_throughput_basic(env, extra_data):
  475. """
  476. steps: |
  477. 1. test TCP tx rx and UDP tx rx throughput
  478. 2. compare with the pre-defined pass standard
  479. """
  480. pc_nic_ip = env.get_pc_nic_info("pc_nic", "ipv4")["addr"]
  481. pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
  482. ap_info = {
  483. "ssid": env.get_variable("ap_ssid"),
  484. "password": env.get_variable("ap_password"),
  485. }
  486. # 1. get DUT
  487. dut = env.get_dut("iperf", "examples/wifi/iperf", app_config_name=BEST_PERFORMANCE_CONFIG)
  488. dut.start_app()
  489. dut.expect("esp32>")
  490. # 2. preparing
  491. test_result = {
  492. "tcp_tx": TestResult("tcp", "tx", BEST_PERFORMANCE_CONFIG),
  493. "tcp_rx": TestResult("tcp", "rx", BEST_PERFORMANCE_CONFIG),
  494. "udp_tx": TestResult("udp", "tx", BEST_PERFORMANCE_CONFIG),
  495. "udp_rx": TestResult("udp", "rx", BEST_PERFORMANCE_CONFIG),
  496. }
  497. test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info["ssid"],
  498. ap_info["password"], pc_nic_ip, pc_iperf_log_file, test_result)
  499. # 3. run test for TCP Tx, Rx and UDP Tx, Rx
  500. for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
  501. test_utility.run_all_cases(0)
  502. # 4. log performance and compare with pass standard
  503. performance_items = []
  504. for throughput_type in test_result:
  505. ttfw_idf.log_performance("{}_throughput".format(throughput_type),
  506. "{:.02f} Mbps".format(test_result[throughput_type].get_best_throughput()))
  507. performance_items.append(["{}_throughput".format(throughput_type),
  508. "{:.02f} Mbps".format(test_result[throughput_type].get_best_throughput())])
  509. # 5. save to report
  510. TinyFW.JunitReport.update_performance(performance_items)
  511. # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
  512. for throughput_type in test_result:
  513. ttfw_idf.check_performance("{}_throughput".format(throughput_type),
  514. test_result[throughput_type].get_best_throughput())
  515. env.close_dut("iperf")
  516. if __name__ == '__main__':
  517. test_wifi_throughput_basic(env_config_file="EnvConfig.yml")
  518. test_wifi_throughput_with_different_configs(env_config_file="EnvConfig.yml")
  519. test_wifi_throughput_vs_rssi(env_config_file="EnvConfig.yml")