AdvInterval.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import re
  2. import subprocess
  3. import time
  4. from TCAction import PerformanceTCBase
  5. from TCAction import TCActionBase
  6. from NativeLog import NativeLog
  7. from comm.NIC import Adapter
  8. MIN_PACKETS_TO_CHECK = 10
  9. class TestCase(PerformanceTCBase.PerformanceTCBase):
  10. INTERVAL_PATTERN = re.compile("(0x[0-9A-Fa-f]+)-(0x[0-9A-Fa-f]+)")
  11. def __init__(self, test_case, test_env, timeout, log_path):
  12. PerformanceTCBase.PerformanceTCBase.__init__(self, test_case, test_env, timeout, log_path)
  13. self.interval_list = []
  14. self.deviation = 0
  15. # load param from excel
  16. cmd_set = test_case["cmd set"]
  17. for i in range(1, len(cmd_set)):
  18. if cmd_set[i][0] != "dummy":
  19. cmd_string = "self." + cmd_set[i][0]
  20. exec cmd_string
  21. self.result_cntx = TCActionBase.ResultCheckContext(self, test_env, self.tc_name)
  22. pass
  23. def process_packets(self, packets, interval):
  24. def convert_ts(ts):
  25. return float("%s.%s" % (ts["second"], ts["microsecond"]))
  26. def filter_packets():
  27. _filtered_packets = []
  28. mac_address = self.get_parameter("dut1_bt_mac")
  29. for _packet in packets:
  30. packet_str = str(_packet)
  31. if "[le_sub_event_code]: LEAdvReport" in packet_str \
  32. and "[address_0]: %s" % mac_address in packet_str:
  33. _filtered_packets.append(_packet)
  34. # sort by time
  35. _filtered_packets.sort(key=lambda x: convert_ts(x.ts))
  36. return _filtered_packets
  37. filtered_packets = filter_packets()
  38. # add captured packets to log
  39. for packet in filtered_packets:
  40. self.result_check("BLENIC", str(packet))
  41. # flush data cache to free memory
  42. self.flush_data("BLENIC")
  43. # scan will switch channel, therefore need to check if there're successive fails
  44. succeed_packets = 0
  45. # process packets
  46. # unit: ms; allow deviation for interval
  47. allowed_range = [float(interval[0] * (1 - self.deviation) * 0.625) / 1000,
  48. float(interval[1] * (1 + self.deviation) * 0.625) / 1000]
  49. NativeLog.add_trace_info("[BLE][AdvInterval] allowed_interval_range is %s" % allowed_range)
  50. for i in range(len(filtered_packets) - 1):
  51. _p1 = filtered_packets[i]
  52. _p2 = filtered_packets[i+1]
  53. interval = convert_ts(_p2.ts) - convert_ts(_p1.ts)
  54. if allowed_range[0] < interval < allowed_range[1]:
  55. succeed_packets += 1
  56. else:
  57. pass
  58. result = True if succeed_packets >= MIN_PACKETS_TO_CHECK else False
  59. return result
  60. def execute(self):
  61. TCActionBase.TCActionBase.execute(self)
  62. test_result = "Succeed"
  63. # open capture device
  64. adapter = Adapter.Adapter(self.get_parameter("bt_capture_nic"), "capture", capture_type="bluetooth")
  65. ret = adapter.start_capture()
  66. if ret != "LIBPCAP_SUCCEED":
  67. NativeLog.add_trace_critical("Can't start capture packets: %s" % ret)
  68. return
  69. def run_test_once(interval):
  70. # flush all packets
  71. adapter.get_packets()
  72. # config ble adv data
  73. self.serial_write_line("SSC1", "bleadv -D -z stop")
  74. self.check_response("SSC1", "+BLEADV")
  75. self.serial_write_line("SSC1", "bleadv -L -c 0 -t 3")
  76. self.check_response("SSC1", "+BLEADV")
  77. # set adv param and start adv
  78. self.serial_write_line("SSC1", "bleadv -D -z start -i 0x%04X-0x%04X -h 1" % (interval[0], interval[1]))
  79. self.check_response("SSC1", "+BLEADV:OK")
  80. # set scan window = scan interval = 2s, scan for 6s, each channel scan 2 second.
  81. subprocess.check_output("hcitool cmd 0x08 0x000b 0x00 0x80 0x0c 0x80 0x0c 0x00 0x00\n", shell=True)
  82. subprocess.check_output("hcitool cmd 0x08 0x000c 0x01 0x00\n", shell=True)
  83. time.sleep(6)
  84. subprocess.check_output("hcitool cmd 0x08 0x000c 0x00 0x00\n", shell=True)
  85. packets = adapter.get_packets()
  86. return self.process_packets(packets, interval)
  87. for _interval in self.interval_list:
  88. match = self.INTERVAL_PATTERN.search(_interval)
  89. if match is not None:
  90. if run_test_once([int(match.group(1), base=16), int(match.group(2), base=16)]) is False:
  91. NativeLog.add_trace_critical("Test fail for interval: %s." % _interval)
  92. test_result = "Fail"
  93. else:
  94. NativeLog.add_trace_critical("interval string format not correct: %s." % _interval)
  95. test_result = "Fail"
  96. pass
  97. self.set_result(test_result)
  98. def main():
  99. pass
  100. if __name__ == '__main__':
  101. pass