iperf_test.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. """
  2. Test case for iperf example.
  3. This test case might have problem running on windows:
  4. 1. direct use of `make`
  5. 2. use `sudo killall iperf` to force kill iperf, didn't implement windows version
  6. The test env Example_ShieldBox do need the following config::
  7. Example_ShieldBox:
  8. ap_list:
  9. - ssid: "ssid"
  10. password: "password"
  11. outlet: 1
  12. apc_ip: "192.168.1.88"
  13. attenuator_port: "/dev/ttyUSB0"
  14. iperf: "/dev/ttyUSB1"
  15. apc_ip: "192.168.1.88"
  16. pc_nic: "eth0"
  17. """
  18. from __future__ import division
  19. from __future__ import unicode_literals
  20. from builtins import str
  21. from builtins import range
  22. from builtins import object
  23. import re
  24. import os
  25. import sys
  26. import time
  27. import subprocess
  28. try:
  29. import IDF
  30. except ImportError:
  31. # this is a test case write with tiny-test-fw.
  32. # to run test cases outside tiny-test-fw,
  33. # we need to set environment variable `TEST_FW_PATH`,
  34. # then get and insert `TEST_FW_PATH` to sys path before import FW module
  35. test_fw_path = os.getenv("TEST_FW_PATH")
  36. if test_fw_path and test_fw_path not in sys.path:
  37. sys.path.insert(0, test_fw_path)
  38. import IDF
  39. import DUT
  40. import Utility
  41. from Utility import (Attenuator, PowerControl, LineChart)
  42. try:
  43. from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
  44. except ImportError:
  45. # add current folder to system path for importing test_report
  46. sys.path.append(os.path.dirname(__file__))
  47. from test_report import (ThroughputForConfigsReport, ThroughputVsRssiReport)
  48. # configurations
  49. TEST_TIME = TEST_TIMEOUT = 60
  50. WAIT_AP_POWER_ON_TIMEOUT = 90
  51. SCAN_TIMEOUT = 3
  52. SCAN_RETRY_COUNT = 3
  53. RETRY_COUNT_FOR_BEST_PERFORMANCE = 2
  54. ATTEN_VALUE_LIST = range(0, 60, 2)
  55. # constants
  56. FAILED_TO_SCAN_RSSI = -97
  57. INVALID_HEAP_SIZE = 0xFFFFFFFF
  58. PC_IPERF_TEMP_LOG_FILE = ".tmp_iperf.log"
  59. CONFIG_NAME_PATTERN = re.compile(r"sdkconfig\.defaults\.(.+)")
  60. # We need to auto compare the difference between adjacent configs (01 -> 00, 02 -> 01, ...) and put them to reports.
  61. # Using numbers for config will make this easy.
  62. # Use default value `99` for config with best performance.
  63. BEST_PERFORMANCE_CONFIG = "99"
  64. class TestResult(object):
  65. """ record, analysis test result and convert data to output format """
  66. PC_BANDWIDTH_LOG_PATTERN = re.compile(r"(\d+).0\s*-\s*(\d+).0\s+sec\s+[\d.]+\s+MBytes\s+([\d.]+)\s+Mbits/sec")
  67. DUT_BANDWIDTH_LOG_PATTERN = re.compile(r"(\d+)-\s+(\d+)\s+sec\s+([\d.]+)\s+Mbits/sec")
  68. ZERO_POINT_THRESHOLD = -88 # RSSI, dbm
  69. ZERO_THROUGHPUT_THRESHOLD = -92 # RSSI, dbm
  70. BAD_POINT_RSSI_THRESHOLD = -85 # RSSI, dbm
  71. BAD_POINT_MIN_THRESHOLD = 3 # Mbps
  72. BAD_POINT_PERCENTAGE_THRESHOLD = 0.3
  73. # we need at least 1/2 valid points to qualify the test result
  74. THROUGHPUT_QUALIFY_COUNT = TEST_TIME // 2
  75. def __init__(self, proto, direction, config_name):
  76. self.proto = proto
  77. self.direction = direction
  78. self.config_name = config_name
  79. self.throughput_by_rssi = dict()
  80. self.throughput_by_att = dict()
  81. self.att_rssi_map = dict()
  82. self.heap_size = INVALID_HEAP_SIZE
  83. self.error_list = []
  84. def _save_result(self, throughput, ap_ssid, att, rssi, heap_size):
  85. """
  86. save the test results:
  87. * record the better throughput if att/rssi is the same.
  88. * record the min heap size.
  89. """
  90. if ap_ssid not in self.att_rssi_map:
  91. # for new ap, create empty dict()
  92. self.throughput_by_att[ap_ssid] = dict()
  93. self.throughput_by_rssi[ap_ssid] = dict()
  94. self.att_rssi_map[ap_ssid] = dict()
  95. self.att_rssi_map[ap_ssid][att] = rssi
  96. def record_throughput(database, key_value):
  97. try:
  98. # we save the larger value for same att
  99. if throughput > database[ap_ssid][key_value]:
  100. database[ap_ssid][key_value] = throughput
  101. except KeyError:
  102. database[ap_ssid][key_value] = throughput
  103. record_throughput(self.throughput_by_att, att)
  104. record_throughput(self.throughput_by_rssi, rssi)
  105. if int(heap_size) < self.heap_size:
  106. self.heap_size = int(heap_size)
  107. def add_result(self, raw_data, ap_ssid, att, rssi, heap_size):
  108. """
  109. add result for one test
  110. :param raw_data: iperf raw data
  111. :param ap_ssid: ap ssid that tested
  112. :param att: attenuate value
  113. :param rssi: AP RSSI
  114. :param heap_size: min heap size during test
  115. :return: throughput
  116. """
  117. fall_to_0_recorded = 0
  118. throughput_list = []
  119. result_list = self.PC_BANDWIDTH_LOG_PATTERN.findall(raw_data)
  120. if not result_list:
  121. # failed to find raw data by PC pattern, it might be DUT pattern
  122. result_list = self.DUT_BANDWIDTH_LOG_PATTERN.findall(raw_data)
  123. for result in result_list:
  124. if int(result[1]) - int(result[0]) != 1:
  125. # this could be summary, ignore this
  126. continue
  127. throughput_list.append(float(result[2]))
  128. if float(result[2]) == 0 and rssi > self.ZERO_POINT_THRESHOLD \
  129. and fall_to_0_recorded < 1:
  130. # throughput fall to 0 error. we only record 1 records for one test
  131. self.error_list.append("[Error][fall to 0][{}][att: {}][rssi: {}]: 0 throughput interval: {}-{}"
  132. .format(ap_ssid, att, rssi, result[0], result[1]))
  133. fall_to_0_recorded += 1
  134. if len(throughput_list) > self.THROUGHPUT_QUALIFY_COUNT:
  135. throughput = sum(throughput_list) / len(throughput_list)
  136. else:
  137. throughput = 0.0
  138. if throughput == 0 and rssi > self.ZERO_THROUGHPUT_THRESHOLD:
  139. self.error_list.append("[Error][Fatal][{}][att: {}][rssi: {}]: No throughput data found"
  140. .format(ap_ssid, att, rssi))
  141. self._save_result(throughput, ap_ssid, att, rssi, heap_size)
  142. return throughput
  143. def post_analysis(self):
  144. """
  145. some rules need to be checked after we collected all test raw data:
  146. 1. throughput value 30% worse than the next point with lower RSSI
  147. 2. throughput value 30% worse than the next point with larger attenuate
  148. """
  149. def analysis_bad_point(data, index_type):
  150. for ap_ssid in data:
  151. result_dict = data[ap_ssid]
  152. index_list = list(result_dict.keys())
  153. index_list.sort()
  154. if index_type == "att":
  155. index_list.reverse()
  156. for i, index_value in enumerate(index_list[1:]):
  157. if index_value < self.BAD_POINT_RSSI_THRESHOLD or \
  158. result_dict[index_list[i]] < self.BAD_POINT_MIN_THRESHOLD:
  159. continue
  160. _percentage = result_dict[index_value] / result_dict[index_list[i]]
  161. if _percentage < 1 - self.BAD_POINT_PERCENTAGE_THRESHOLD:
  162. self.error_list.append("[Error][Bad point][{}][{}: {}]: drop {:.02f}%"
  163. .format(ap_ssid, index_type, index_value,
  164. (1 - _percentage) * 100))
  165. analysis_bad_point(self.throughput_by_rssi, "rssi")
  166. analysis_bad_point(self.throughput_by_att, "att")
  167. @staticmethod
  168. def _convert_to_draw_format(data, label):
  169. keys = data.keys()
  170. keys.sort()
  171. return {
  172. "x-axis": keys,
  173. "y-axis": [data[x] for x in keys],
  174. "label": label,
  175. }
  176. def draw_throughput_figure(self, path, ap_ssid, draw_type):
  177. """
  178. :param path: folder to save figure. make sure the folder is already created.
  179. :param ap_ssid: ap ssid string or a list of ap ssid string
  180. :param draw_type: "att" or "rssi"
  181. :return: file_name
  182. """
  183. if draw_type == "rssi":
  184. type_name = "RSSI"
  185. data = self.throughput_by_rssi
  186. elif draw_type == "att":
  187. type_name = "Att"
  188. data = self.throughput_by_att
  189. else:
  190. raise AssertionError("draw type not supported")
  191. if isinstance(ap_ssid, list):
  192. file_name = "ThroughputVs{}_{}_{}_{}.png".format(type_name, self.proto, self.direction,
  193. hash(ap_ssid)[:6])
  194. data_list = [self._convert_to_draw_format(data[_ap_ssid], _ap_ssid)
  195. for _ap_ssid in ap_ssid]
  196. else:
  197. file_name = "ThroughputVs{}_{}_{}_{}.png".format(type_name, self.proto, self.direction, ap_ssid)
  198. data_list = [self._convert_to_draw_format(data[ap_ssid], ap_ssid)]
  199. LineChart.draw_line_chart(os.path.join(path, file_name),
  200. "Throughput Vs {} ({} {})".format(type_name, self.proto, self.direction),
  201. "Throughput (Mbps)",
  202. "{} (dbm)".format(type_name),
  203. data_list)
  204. return file_name
  205. def draw_rssi_vs_att_figure(self, path, ap_ssid):
  206. """
  207. :param path: folder to save figure. make sure the folder is already created.
  208. :param ap_ssid: ap to use
  209. :return: file_name
  210. """
  211. if isinstance(ap_ssid, list):
  212. file_name = "AttVsRSSI_{}.png".format(hash(ap_ssid)[:6])
  213. data_list = [self._convert_to_draw_format(self.att_rssi_map[_ap_ssid], _ap_ssid)
  214. for _ap_ssid in ap_ssid]
  215. else:
  216. file_name = "AttVsRSSI_{}.png".format(ap_ssid)
  217. data_list = [self._convert_to_draw_format(self.att_rssi_map[ap_ssid], ap_ssid)]
  218. LineChart.draw_line_chart(os.path.join(path, file_name),
  219. "Att Vs RSSI",
  220. "Att (dbm)",
  221. "RSSI (dbm)",
  222. data_list)
  223. return file_name
  224. def get_best_throughput(self):
  225. """ get the best throughput during test """
  226. best_for_aps = [max(self.throughput_by_att[ap_ssid].values())
  227. for ap_ssid in self.throughput_by_att]
  228. return max(best_for_aps)
  229. def __str__(self):
  230. """
  231. returns summary for this test:
  232. 1. test result (success or fail)
  233. 2. best performance for each AP
  234. 3. min free heap size during test
  235. """
  236. if self.throughput_by_att:
  237. ret = "[{}_{}][{}]: {}\r\n\r\n".format(self.proto, self.direction, self.config_name,
  238. "Fail" if self.error_list else "Success")
  239. ret += "Performance for each AP:\r\n"
  240. for ap_ssid in self.throughput_by_att:
  241. ret += "[{}]: {:.02f} Mbps\r\n".format(ap_ssid, max(self.throughput_by_att[ap_ssid].values()))
  242. if self.heap_size != INVALID_HEAP_SIZE:
  243. ret += "Minimum heap size: {}".format(self.heap_size)
  244. else:
  245. ret = ""
  246. return ret
  247. class IperfTestUtility(object):
  248. """ iperf test implementation """
  249. def __init__(self, dut, config_name, ap_ssid, ap_password,
  250. pc_nic_ip, pc_iperf_log_file, test_result=None):
  251. self.config_name = config_name
  252. self.dut = dut
  253. self.pc_iperf_log_file = pc_iperf_log_file
  254. self.ap_ssid = ap_ssid
  255. self.ap_password = ap_password
  256. self.pc_nic_ip = pc_nic_ip
  257. if test_result:
  258. self.test_result = test_result
  259. else:
  260. self.test_result = {
  261. "tcp_tx": TestResult("tcp", "tx", config_name),
  262. "tcp_rx": TestResult("tcp", "rx", config_name),
  263. "udp_tx": TestResult("udp", "tx", config_name),
  264. "udp_rx": TestResult("udp", "rx", config_name),
  265. }
  266. def setup(self):
  267. """
  268. setup iperf test:
  269. 1. kill current iperf process
  270. 2. reboot DUT (currently iperf is not very robust, need to reboot DUT)
  271. 3. scan to get AP RSSI
  272. 4. connect to AP
  273. """
  274. try:
  275. subprocess.check_output("sudo killall iperf 2>&1 > /dev/null", shell=True)
  276. except subprocess.CalledProcessError:
  277. pass
  278. self.dut.write("restart")
  279. self.dut.expect("esp32>")
  280. self.dut.write("scan {}".format(self.ap_ssid))
  281. for _ in range(SCAN_RETRY_COUNT):
  282. try:
  283. rssi = int(self.dut.expect(re.compile(r"\[{}]\[rssi=(-\d+)]".format(self.ap_ssid)),
  284. timeout=SCAN_TIMEOUT)[0])
  285. break
  286. except DUT.ExpectTimeout:
  287. continue
  288. else:
  289. raise AssertionError("Failed to scan AP")
  290. self.dut.write("sta {} {}".format(self.ap_ssid, self.ap_password))
  291. dut_ip = self.dut.expect(re.compile(r"event: sta ip: ([\d.]+), mask: ([\d.]+), gw: ([\d.]+)"))[0]
  292. return dut_ip, rssi
  293. def _save_test_result(self, test_case, raw_data, att, rssi, heap_size):
  294. return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
  295. def _test_once(self, proto, direction):
  296. """ do measure once for one type """
  297. # connect and scan to get RSSI
  298. dut_ip, rssi = self.setup()
  299. assert direction in ["rx", "tx"]
  300. assert proto in ["tcp", "udp"]
  301. # run iperf test
  302. if direction == "tx":
  303. with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
  304. if proto == "tcp":
  305. process = subprocess.Popen(["iperf", "-s", "-B", self.pc_nic_ip,
  306. "-t", str(TEST_TIME), "-i", "1", "-f", "m"],
  307. stdout=f, stderr=f)
  308. self.dut.write("iperf -c {} -i 1 -t {}".format(self.pc_nic_ip, TEST_TIME))
  309. else:
  310. process = subprocess.Popen(["iperf", "-s", "-u", "-B", self.pc_nic_ip,
  311. "-t", str(TEST_TIME), "-i", "1", "-f", "m"],
  312. stdout=f, stderr=f)
  313. self.dut.write("iperf -c {} -u -i 1 -t {}".format(self.pc_nic_ip, TEST_TIME))
  314. for _ in range(TEST_TIMEOUT):
  315. if process.poll() is not None:
  316. break
  317. time.sleep(1)
  318. else:
  319. process.terminate()
  320. with open(PC_IPERF_TEMP_LOG_FILE, "r") as f:
  321. pc_raw_data = server_raw_data = f.read()
  322. else:
  323. with open(PC_IPERF_TEMP_LOG_FILE, "w") as f:
  324. if proto == "tcp":
  325. self.dut.write("iperf -s -i 1 -t {}".format(TEST_TIME))
  326. process = subprocess.Popen(["iperf", "-c", dut_ip,
  327. "-t", str(TEST_TIME), "-f", "m"],
  328. stdout=f, stderr=f)
  329. else:
  330. self.dut.write("iperf -s -u -i 1 -t {}".format(TEST_TIME))
  331. process = subprocess.Popen(["iperf", "-c", dut_ip, "-u", "-b", "100M",
  332. "-t", str(TEST_TIME), "-f", "m"],
  333. stdout=f, stderr=f)
  334. for _ in range(TEST_TIMEOUT):
  335. if process.poll() is not None:
  336. break
  337. time.sleep(1)
  338. else:
  339. process.terminate()
  340. server_raw_data = self.dut.read()
  341. with open(PC_IPERF_TEMP_LOG_FILE, "r") as f:
  342. pc_raw_data = f.read()
  343. # save PC iperf logs to console
  344. with open(self.pc_iperf_log_file, "a+") as f:
  345. f.write("## [{}] `{}`\r\n##### {}"
  346. .format(self.config_name,
  347. "{}_{}".format(proto, direction),
  348. time.strftime("%m-%d %H:%M:%S", time.localtime(time.time()))))
  349. f.write('\r\n```\r\n\r\n' + pc_raw_data + '\r\n```\r\n')
  350. self.dut.write("heap")
  351. heap_size = self.dut.expect(re.compile(r"min heap size: (\d+)\D"))[0]
  352. # return server raw data (for parsing test results) and RSSI
  353. return server_raw_data, rssi, heap_size
  354. def run_test(self, proto, direction, atten_val):
  355. """
  356. run test for one type, with specified atten_value and save the test result
  357. :param proto: tcp or udp
  358. :param direction: tx or rx
  359. :param atten_val: attenuate value
  360. """
  361. rssi = FAILED_TO_SCAN_RSSI
  362. heap_size = INVALID_HEAP_SIZE
  363. try:
  364. server_raw_data, rssi, heap_size = self._test_once(proto, direction)
  365. throughput = self._save_test_result("{}_{}".format(proto, direction),
  366. server_raw_data, atten_val,
  367. rssi, heap_size)
  368. Utility.console_log("[{}][{}_{}][{}][{}]: {:.02f}"
  369. .format(self.config_name, proto, direction, rssi, self.ap_ssid, throughput))
  370. except Exception as e:
  371. self._save_test_result("{}_{}".format(proto, direction), "", atten_val, rssi, heap_size)
  372. Utility.console_log("Failed during test: {}".format(e))
  373. def run_all_cases(self, atten_val):
  374. """
  375. run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
  376. :param atten_val: attenuate value
  377. """
  378. self.run_test("tcp", "tx", atten_val)
  379. self.run_test("tcp", "rx", atten_val)
  380. self.run_test("udp", "tx", atten_val)
  381. self.run_test("udp", "rx", atten_val)
  382. def wait_ap_power_on(self):
  383. """
  384. AP need to take sometime to power on. It changes for different APs.
  385. This method will scan to check if the AP powers on.
  386. :return: True or False
  387. """
  388. self.dut.write("restart")
  389. self.dut.expect("esp32>")
  390. for _ in range(WAIT_AP_POWER_ON_TIMEOUT // SCAN_TIMEOUT):
  391. try:
  392. self.dut.write("scan {}".format(self.ap_ssid))
  393. self.dut.expect(re.compile(r"\[{}]\[rssi=(-\d+)]".format(self.ap_ssid)),
  394. timeout=SCAN_TIMEOUT)
  395. ret = True
  396. break
  397. except DUT.ExpectTimeout:
  398. pass
  399. else:
  400. ret = False
  401. return ret
  402. def build_iperf_with_config(config_name):
  403. """
  404. we need to build iperf example with different configurations.
  405. :param config_name: sdkconfig we want to build
  406. """
  407. # switch to iperf example path before build when we're running test with Runner
  408. example_path = os.path.dirname(__file__)
  409. cwd = os.getcwd()
  410. if cwd != example_path and example_path:
  411. os.chdir(example_path)
  412. try:
  413. subprocess.check_call("make clean > /dev/null", shell=True)
  414. subprocess.check_call(["cp", "sdkconfig.defaults.{}".format(config_name), "sdkconfig.defaults"])
  415. subprocess.check_call(["rm", "-f", "sdkconfig"])
  416. subprocess.check_call("make defconfig > /dev/null", shell=True)
  417. # save sdkconfig to generate config comparision report
  418. subprocess.check_call(["cp", "sdkconfig", "sdkconfig.{}".format(config_name)])
  419. subprocess.check_call("make -j5 > /dev/null", shell=True)
  420. subprocess.check_call("make print_flash_cmd | tail -n 1 > build/download.config", shell=True)
  421. finally:
  422. os.chdir(cwd)
  423. @IDF.idf_example_test(env_tag="Example_ShieldBox_Basic", category="stress")
  424. def test_wifi_throughput_with_different_configs(env, extra_data):
  425. """
  426. steps: |
  427. 1. build iperf with specified configs
  428. 2. test throughput for all routers
  429. """
  430. pc_nic_ip = env.get_pc_nic_info("pc_nic", "ipv4")["addr"]
  431. pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
  432. ap_info = {
  433. "ssid": env.get_variable("ap_ssid"),
  434. "password": env.get_variable("ap_password"),
  435. }
  436. config_names_raw = subprocess.check_output(["ls", os.path.dirname(os.path.abspath(__file__))])
  437. test_result = dict()
  438. sdkconfig_files = dict()
  439. for config_name in CONFIG_NAME_PATTERN.findall(config_names_raw):
  440. # 1. build config
  441. build_iperf_with_config(config_name)
  442. sdkconfig_files[config_name] = os.path.join(os.path.dirname(__file__),
  443. "sdkconfig.{}".format(config_name))
  444. # 2. get DUT and download
  445. dut = env.get_dut("iperf", "examples/wifi/iperf")
  446. dut.start_app()
  447. dut.expect("esp32>")
  448. # 3. run test for each required att value
  449. test_result[config_name] = {
  450. "tcp_tx": TestResult("tcp", "tx", config_name),
  451. "tcp_rx": TestResult("tcp", "rx", config_name),
  452. "udp_tx": TestResult("udp", "tx", config_name),
  453. "udp_rx": TestResult("udp", "rx", config_name),
  454. }
  455. test_utility = IperfTestUtility(dut, config_name, ap_info["ssid"],
  456. ap_info["password"], pc_nic_ip, pc_iperf_log_file, test_result[config_name])
  457. for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
  458. test_utility.run_all_cases(0)
  459. for result_type in test_result[config_name]:
  460. summary = str(test_result[config_name][result_type])
  461. if summary:
  462. Utility.console_log(summary, color="orange")
  463. # 4. check test results
  464. env.close_dut("iperf")
  465. # 5. generate report
  466. report = ThroughputForConfigsReport(os.path.join(env.log_path, "ThroughputForConfigsReport"),
  467. ap_info["ssid"], test_result, sdkconfig_files)
  468. report.generate_report()
  469. @IDF.idf_example_test(env_tag="Example_ShieldBox", category="stress")
  470. def test_wifi_throughput_vs_rssi(env, extra_data):
  471. """
  472. steps: |
  473. 1. build with best performance config
  474. 2. switch on one router
  475. 3. set attenuator value from 0-60 for each router
  476. 4. test TCP tx rx and UDP tx rx throughput
  477. """
  478. att_port = env.get_variable("attenuator_port")
  479. ap_list = env.get_variable("ap_list")
  480. pc_nic_ip = env.get_pc_nic_info("pc_nic", "ipv4")["addr"]
  481. apc_ip = env.get_variable("apc_ip")
  482. pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
  483. test_result = {
  484. "tcp_tx": TestResult("tcp", "tx", BEST_PERFORMANCE_CONFIG),
  485. "tcp_rx": TestResult("tcp", "rx", BEST_PERFORMANCE_CONFIG),
  486. "udp_tx": TestResult("udp", "tx", BEST_PERFORMANCE_CONFIG),
  487. "udp_rx": TestResult("udp", "rx", BEST_PERFORMANCE_CONFIG),
  488. }
  489. # 1. build config
  490. build_iperf_with_config(BEST_PERFORMANCE_CONFIG)
  491. # 2. get DUT and download
  492. dut = env.get_dut("iperf", "examples/wifi/iperf")
  493. dut.start_app()
  494. dut.expect("esp32>")
  495. # 3. run test for each required att value
  496. for ap_info in ap_list:
  497. test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info["ssid"], ap_info["password"],
  498. pc_nic_ip, pc_iperf_log_file, test_result)
  499. PowerControl.Control.control_rest(apc_ip, ap_info["outlet"], "OFF")
  500. PowerControl.Control.control(apc_ip, {ap_info["outlet"]: "ON"})
  501. Attenuator.set_att(att_port, 0)
  502. if not test_utility.wait_ap_power_on():
  503. Utility.console_log("[{}] failed to power on, skip testing this AP"
  504. .format(ap_info["ssid"]), color="red")
  505. continue
  506. for atten_val in ATTEN_VALUE_LIST:
  507. assert Attenuator.set_att(att_port, atten_val) is True
  508. test_utility.run_all_cases(atten_val)
  509. # 4. check test results
  510. env.close_dut("iperf")
  511. # 5. generate report
  512. report = ThroughputVsRssiReport(os.path.join(env.log_path, "ThroughputVsRssiReport"),
  513. test_result)
  514. report.generate_report()
  515. @IDF.idf_example_test(env_tag="Example_ShieldBox_Basic")
  516. def test_wifi_throughput_basic(env, extra_data):
  517. """
  518. steps: |
  519. 1. test TCP tx rx and UDP tx rx throughput
  520. 2. compare with the pre-defined pass standard
  521. """
  522. pc_nic_ip = env.get_pc_nic_info("pc_nic", "ipv4")["addr"]
  523. pc_iperf_log_file = os.path.join(env.log_path, "pc_iperf_log.md")
  524. ap_info = {
  525. "ssid": env.get_variable("ap_ssid"),
  526. "password": env.get_variable("ap_password"),
  527. }
  528. # 1. build iperf with best config
  529. build_iperf_with_config(BEST_PERFORMANCE_CONFIG)
  530. # 2. get DUT
  531. dut = env.get_dut("iperf", "examples/wifi/iperf")
  532. dut.start_app()
  533. dut.expect("esp32>")
  534. # 3. preparing
  535. test_result = {
  536. "tcp_tx": TestResult("tcp", "tx", BEST_PERFORMANCE_CONFIG),
  537. "tcp_rx": TestResult("tcp", "rx", BEST_PERFORMANCE_CONFIG),
  538. "udp_tx": TestResult("udp", "tx", BEST_PERFORMANCE_CONFIG),
  539. "udp_rx": TestResult("udp", "rx", BEST_PERFORMANCE_CONFIG),
  540. }
  541. test_utility = IperfTestUtility(dut, BEST_PERFORMANCE_CONFIG, ap_info["ssid"],
  542. ap_info["password"], pc_nic_ip, pc_iperf_log_file, test_result)
  543. # 4. run test for TCP Tx, Rx and UDP Tx, Rx
  544. for _ in range(RETRY_COUNT_FOR_BEST_PERFORMANCE):
  545. test_utility.run_all_cases(0)
  546. # 5. log performance and compare with pass standard
  547. for throughput_type in test_result:
  548. IDF.log_performance("{}_throughput".format(throughput_type),
  549. "{:.02f} Mbps".format(test_result[throughput_type].get_best_throughput()))
  550. # do check after logging, otherwise test will exit immediately if check fail, some performance can't be logged.
  551. for throughput_type in test_result:
  552. IDF.check_performance("{}_throughput".format(throughput_type),
  553. test_result[throughput_type].get_best_throughput())
  554. env.close_dut("iperf")
  555. if __name__ == '__main__':
  556. test_wifi_throughput_basic(env_config_file="EnvConfig.yml")
  557. test_wifi_throughput_with_different_configs(env_config_file="EnvConfig.yml")
  558. test_wifi_throughput_vs_rssi(env_config_file="EnvConfig.yml")