UDPBroadcast.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. import time
  2. import random
  3. import threading
  4. import socket
  5. import re
  6. from TCAction import PerformanceTCBase
  7. from NativeLog import NativeLog
  8. from Utility import Encoding
  9. class SendThread(threading.Thread):
  10. def __init__(self, sock, send_len, target_addr, delay):
  11. threading.Thread.__init__(self)
  12. self.sock = sock
  13. self.send_len = send_len
  14. self.target_addr = target_addr
  15. self.delay = delay
  16. self.exit_event = threading.Event()
  17. self.send_count = 0
  18. pass
  19. def exit(self):
  20. self.exit_event.set()
  21. def run(self):
  22. data = "A" * self.send_len
  23. if self.sock is None:
  24. return
  25. while True:
  26. if self.exit_event.isSet() is True:
  27. break
  28. try:
  29. self.sock.sendto(data, self.target_addr)
  30. except StandardError:
  31. break
  32. self.send_count += 1
  33. time.sleep(self.delay * 0.001)
  34. pass
  35. def get_send_count(self):
  36. return self.send_count
  37. class RecvThread(threading.Thread):
  38. def __init__(self, sock):
  39. threading.Thread.__init__(self)
  40. self.sock = sock
  41. self.exit_event = threading.Event()
  42. self.calc_event = threading.Event()
  43. self.bytes_recv = 0
  44. self.Max = 0.0
  45. def start_calc(self):
  46. self.calc_event.set()
  47. def stop_calc(self):
  48. self.calc_event.clear()
  49. self.exit_event.set()
  50. def run(self):
  51. if self.sock is None:
  52. return
  53. while True:
  54. if self.exit_event.isSet() is True:
  55. break
  56. try:
  57. data, addr = self.sock.recvfrom(2048)
  58. except StandardError:
  59. break
  60. if self.calc_event.isSet() is True:
  61. self.bytes_recv += len(data)
  62. if len(data) == 0:
  63. start = time.time()
  64. while True:
  65. try:
  66. data, addr = self.sock.recvfrom(2048)
  67. except StandardError:
  68. break
  69. if len(data) > 0:
  70. if self.calc_event.isSet() is True:
  71. self.bytes_recv += len(data)
  72. end = time.time()
  73. break
  74. if end - start > self.Max:
  75. self.Max = end - start
  76. def get_bytes_recv(self):
  77. return self.bytes_recv
  78. pass
  79. def get_Max_time(self):
  80. return self.Max
  81. pass
  82. class device_check(threading.Thread):
  83. def __init__(self, port):
  84. threading.Thread.__init__(self)
  85. self.Max = 0.0
  86. self.port = port
  87. self.recv_data_cache = ""
  88. self.cache_lock = threading.Lock()
  89. self.exit_event = threading.Event()
  90. def data_recv_callback(self, data):
  91. with self.cache_lock:
  92. self.recv_data_cache += data
  93. pass
  94. def exit(self):
  95. self.exit_event.set()
  96. pass
  97. def run(self):
  98. while self.exit_event.isSet() is False:
  99. while True:
  100. if self.recv_data_cache:
  101. match = re.search("\+RECVFROM:\d+,\d+,\d+\.\d+\.\d+\.\d+,\d+", self.recv_data_cache)
  102. if match is not None:
  103. self.recv_data_cache = self.recv_data_cache[len(match.group()):]
  104. else:
  105. start = time.time()
  106. end = 0.0
  107. while True:
  108. res = re.search("\+RECVFROM:\d+,\d+,\d+\.\d+\.\d+\.\d+,\d+", self.recv_data_cache)
  109. if res is not None:
  110. self.recv_data_cache = self.recv_data_cache[len(res.group()):]
  111. end = time.time()
  112. break
  113. if end - start > self.Max:
  114. self.Max = end - start
  115. pass
  116. def get_max_time(self):
  117. return self.Max
  118. class TestCase(PerformanceTCBase.PerformanceTCBase):
  119. def __init__(self, test_case, test_env, timeout, log_path):
  120. PerformanceTCBase.PerformanceTCBase.__init__(self, test_case, test_env, timeout, log_path)
  121. self.send_len = 0
  122. self.pc_send = 0
  123. self.target_send = 0
  124. self.test_time = 0
  125. self.delay = 0
  126. # load param from excel
  127. cmd_set = test_case["cmd set"]
  128. for i in range(1, len(cmd_set)):
  129. if cmd_set[i][0] != "dummy":
  130. cmd_string = "self." + cmd_set[i][0]
  131. exec cmd_string
  132. self.recv_cb_lock = threading.Lock()
  133. self.recv_cb = dict.fromkeys(["SSC1"])
  134. pass
  135. def register_recv_callback(self, port_name, callback):
  136. with self.recv_cb_lock:
  137. if self.recv_cb[port_name] is None:
  138. self.recv_cb[port_name] = [callback]
  139. else:
  140. self.recv_cb[port_name].append(callback)
  141. pass
  142. def process(self):
  143. try:
  144. # configurable params
  145. send_len = self.send_len
  146. pc_send = self.pc_send
  147. target_send = self.target_send
  148. test_time = self.test_time
  149. delay = self.delay
  150. ap_ssid = self.get_parameter("ap_ssid")
  151. ap_password = self.get_parameter("ap_password")
  152. pc_ip = self.get_parameter("pc_ip")
  153. target_ip = self.get_parameter("target_ip")
  154. except StandardError, e:
  155. NativeLog.add_trace_critical("Error configuration for UDP script, error is %s" % e)
  156. raise StandardError("Error configuration")
  157. udp_port = random.randint(40000, 50000)
  158. # reboot before executing
  159. self.flush_data("SSC1")
  160. self.serial_write_line("SSC1", "reboot")
  161. if self.check_response("SSC1", "ready!!!", 5) is False:
  162. NativeLog.add_trace_critical("Fail to reboot")
  163. return
  164. # set target as STA mode
  165. self.flush_data("SSC1")
  166. self.serial_write_line("SSC1", "op -S -o 1")
  167. if self.check_response("SSC1", "+MODE:OK", 5) is False:
  168. NativeLog.add_trace_critical("Fail to set mode")
  169. return
  170. # connect to AP
  171. self.flush_data("SSC1")
  172. self.serial_write_line("SSC1", "sta -C -s %s -p %s" % (ap_ssid, ap_password))
  173. if self.check_response("SSC1", "+JAP:CONNECTED", 30) is False:
  174. NativeLog.add_trace_critical("Fail to JAP")
  175. return
  176. # disable recv print on target
  177. self.flush_data("SSC1")
  178. self.serial_write_line("SSC1", "soc -R -o 0")
  179. if self.check_response("SSC1", "+RECVPRINT", 5) is False:
  180. NativeLog.add_trace_critical("Fail to disable recv print")
  181. return
  182. # get broadcast ip
  183. res = re.search("(\d+\.\d+\.\d+\.)\d+", pc_ip)
  184. if res is not None:
  185. udp = res.group(1)
  186. broadcast_ip = udp + "255"
  187. else:
  188. NativeLog.add_trace_critical("No ip addr found")
  189. return
  190. # close all connection on target
  191. self.flush_data("SSC1")
  192. self.serial_write_line("SSC1", "soc -T")
  193. if self.check_response("SSC1", "+CLOSEALL", 5) is False:
  194. NativeLog.add_trace_critical("Fail to close sock")
  195. return
  196. # create socket on pc
  197. pc_sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
  198. pc_sock.bind((pc_ip, udp_port))
  199. pc_sock.settimeout(1)
  200. # create socket on target
  201. self.flush_data("SSC1")
  202. self.serial_write_line("SSC1", "soc -B -t UDP -i %s -p %s" % (target_ip, udp_port))
  203. if self.check_response("SSC1", "+BIND:0,OK,", 5) is False:
  204. NativeLog.add_trace_critical("Fail to bind")
  205. return
  206. thread_dict = dict.fromkeys(["SSC1"])
  207. thread_dict["SSC1"] = dict(zip(["check"], [None]))
  208. thread_dict["SSC1"]["check"] = device_check(self.test_env.get_port_by_name("SSC1"))
  209. self.register_recv_callback("SSC1", thread_dict["SSC1"]["check"].data_recv_callback)
  210. send_thread = SendThread(pc_sock if pc_send is True else None, send_len, (broadcast_ip, udp_port), delay)
  211. send_thread.start()
  212. recv_thread = RecvThread(pc_sock if target_send is True else None)
  213. recv_thread.start()
  214. # start calculate
  215. recv_thread.start_calc()
  216. thread_dict["SSC1"]["check"].start()
  217. send_count = 0
  218. if target_send is True:
  219. # do send from target
  220. start = time.time()
  221. while time.time() - start < test_time * 60:
  222. self.flush_data("SSC1")
  223. self.serial_write_line("SSC1", "soc -S -s 0 -l %s -n 1000 -i %s -p %s -j %s" % (
  224. send_len, broadcast_ip, udp_port, delay))
  225. if self.check_response("SSC1", "+SEND:0,OK", 300) is False:
  226. NativeLog.add_trace_critical("Fail to send")
  227. return
  228. send_count += 1000
  229. else:
  230. time.sleep(test_time * 60)
  231. send_thread.exit()
  232. send_thread.join()
  233. # stop throughput calculate
  234. while True:
  235. if recv_thread.isAlive() is False:
  236. recv_thread.stop_calc()
  237. recv_thread.join()
  238. break
  239. Max = 0.0
  240. recv_count = 0
  241. if pc_send is True:
  242. send_count = send_thread.get_send_count()
  243. start = time.time()
  244. rx_data_len = 0
  245. suc_time = 0
  246. while time.time() - start < 30:
  247. self.flush_data("SSC1")
  248. self.serial_write_line("SSC1", "soc -Q -s 0 -o 1")
  249. time.sleep(0.05)
  250. data = self.serial_read_data("SSC1")
  251. if data is not None:
  252. res = re.search("\+RECVLEN:(\d+)", data)
  253. if res is not None:
  254. if rx_data_len < int(res.group(1)):
  255. rx_data_len = int(res.group(1))
  256. time.sleep(0.5)
  257. else:
  258. suc_time += 1
  259. if suc_time > 5:
  260. break
  261. if (rx_data_len * 8 % send_len) > 0:
  262. recv_count = rx_data_len / send_len + 1
  263. else:
  264. recv_count = rx_data_len / send_len
  265. if recv_thread.get_bytes_recv() > 0:
  266. if (recv_thread.get_bytes_recv() % send_len) > 0:
  267. recv_count = recv_thread.get_bytes_recv() / send_len + 1
  268. else:
  269. recv_count = recv_thread.get_bytes_recv() / send_len
  270. Max = recv_thread.get_Max_time()
  271. thread_dict["SSC1"]["check"].exit()
  272. pc_sock.close()
  273. self.set_result("Succeed")
  274. NativeLog.add_trace_critical("send_count is %s, recv_count is %s" % (send_count, recv_count))
  275. NativeLog.add_trace_critical(
  276. "UDP Broadcast lose rate is %.2f%%" % (float(send_count - recv_count) / send_count * 100))
  277. NativeLog.add_trace_critical("UDP Broadcast lose test MAX time is %.4f" % Max)
  278. @Encoding.encode_utf8(3)
  279. def result_check(self, port_name, data):
  280. PerformanceTCBase.PerformanceTCBase.result_check(self, port_name, data)
  281. if port_name in self.recv_cb:
  282. with self.recv_cb_lock:
  283. callback_list = self.recv_cb[port_name]
  284. if callback_list is not None:
  285. for callback in callback_list:
  286. callback(data)
  287. pass
  288. def main():
  289. pass
  290. if __name__ == '__main__':
  291. main()