TCPConnection.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. import random
  2. import re
  3. import socket
  4. import threading
  5. import time
  6. import TCPConnectionUtility
  7. from NativeLog import NativeLog
  8. from TCAction import PerformanceTCBase
  9. DELAY_RANGE = [10, 3000]
  10. CONNECTION_STRUCTURE = ("Connection handler", "PC socket", "Target socket id",
  11. "Target port", "PC port", "PC state", "Target state")
  12. # max fail count for one connection during test
  13. MAX_FAIL_COUNT = 10
  14. class CheckerBase(threading.Thread):
  15. CHECK_ITEM = ("CONDITION", "NOTIFIER", "ID", "DATA")
  16. SLEEP_TIME = 0.1 # sleep 100ms between each check action
  17. def __init__(self):
  18. threading.Thread.__init__(self)
  19. self.setDaemon(True)
  20. self.exit_event = threading.Event()
  21. self.sync_lock = threading.Lock()
  22. self.check_item_list = []
  23. self.check_item_id = 0
  24. def run(self):
  25. while self.exit_event.isSet() is False:
  26. self.process()
  27. pass
  28. def process(self):
  29. pass
  30. def add_check_item(self, condition, notifier):
  31. with self.sync_lock:
  32. check_item_id = self.check_item_id
  33. self.check_item_id += 1
  34. self.check_item_list.append(dict(zip(self.CHECK_ITEM, (condition, notifier, check_item_id, str()))))
  35. return check_item_id
  36. def remove_check_item(self, check_item_id):
  37. ret = None
  38. with self.sync_lock:
  39. check_items = filter(lambda x: x["ID"] == check_item_id, self.check_item_list)
  40. if len(check_items) > 0:
  41. self.check_item_list.remove(check_items[0])
  42. ret = check_items[0]["DATA"]
  43. return ret
  44. def exit(self):
  45. self.exit_event.set()
  46. pass
  47. # check on serial port
  48. class SerialPortChecker(CheckerBase):
  49. def __init__(self, serial_reader):
  50. CheckerBase.__init__(self)
  51. self.serial_reader = serial_reader
  52. pass
  53. # check condition for serial is compiled regular expression pattern
  54. @staticmethod
  55. def do_check(check_item, data):
  56. match = check_item["CONDITION"].search(data)
  57. if match is not None:
  58. pos = data.find(match.group()) + len(match.group())
  59. # notify user
  60. check_item["NOTIFIER"]("serial", match)
  61. else:
  62. pos = -1
  63. return pos
  64. def process(self):
  65. # do check
  66. with self.sync_lock:
  67. # read data
  68. new_data = self.serial_reader()
  69. # NativeLog.add_trace_info("[debug][read data] %s" % new_data)
  70. # do check each item
  71. for check_item in self.check_item_list:
  72. # NativeLog.add_trace_info("[debug][read data][ID][%s]" % check_item["ID"])
  73. check_item["DATA"] += new_data
  74. self.do_check(check_item, check_item["DATA"])
  75. time.sleep(self.SLEEP_TIME)
  76. # handle PC TCP server accept and notify user
  77. class TCPServerChecker(CheckerBase):
  78. def __init__(self, server_sock):
  79. CheckerBase.__init__(self)
  80. self.server_sock = server_sock
  81. server_sock.settimeout(self.SLEEP_TIME)
  82. self.accepted_socket_list = []
  83. # check condition for tcp accepted sock is tcp source port
  84. @staticmethod
  85. def do_check(check_item, data):
  86. for sock_addr_pair in data:
  87. addr = sock_addr_pair[1]
  88. if addr[1] == check_item["CONDITION"]:
  89. # same port, so this is the socket that matched, notify and remove it from list
  90. check_item["NOTIFIER"]("tcp", sock_addr_pair[0])
  91. data.remove(sock_addr_pair)
  92. def process(self):
  93. # do accept
  94. try:
  95. client_sock, addr = self.server_sock.accept()
  96. self.accepted_socket_list.append((client_sock, addr))
  97. except socket.error:
  98. pass
  99. # do check
  100. with self.sync_lock:
  101. check_item_list = self.check_item_list
  102. for check_item in check_item_list:
  103. self.do_check(check_item, self.accepted_socket_list)
  104. pass
  105. # this thread handles one tcp connection.
  106. class ConnectionHandler(threading.Thread):
  107. CHECK_FREQ = CheckerBase.SLEEP_TIME/2
  108. def __init__(self, utility, serial_checker, tcp_checker, connect_method, disconnect_method, test_case):
  109. threading.Thread.__init__(self)
  110. self.setDaemon(True)
  111. self.utility = utility
  112. self.connect_method = connect_method
  113. self.disconnect_method = disconnect_method
  114. self.exit_event = threading.Event()
  115. # following members are used in communication with checker threads
  116. self.serial_checker = serial_checker
  117. self.tcp_checker = tcp_checker
  118. self.serial_notify_event = threading.Event()
  119. self.tcp_notify_event = threading.Event()
  120. self.serial_result = None
  121. self.tcp_result = None
  122. self.serial_check_item_id = None
  123. self.tcp_check_item_id = None
  124. self.data_cache = None
  125. self.fail_count = 0
  126. self.test_case = test_case
  127. pass
  128. def log_error(self):
  129. self.fail_count += 1
  130. if self.fail_count > MAX_FAIL_COUNT:
  131. self.test_case.error_detected()
  132. def new_connection_structure(self):
  133. connection = dict.fromkeys(CONNECTION_STRUCTURE, None)
  134. connection["Connection handler"] = self
  135. return connection
  136. def run(self):
  137. while self.exit_event.isSet() is False:
  138. connection = self.new_connection_structure()
  139. # do connect
  140. connect_method_choice = random.choice(self.connect_method)
  141. if self.utility.execute_tcp_method(connect_method_choice, connection) is False:
  142. self.log_error()
  143. # check if established
  144. if self.utility.is_established_state(connection) is True:
  145. time.sleep(float(random.randint(DELAY_RANGE[0], DELAY_RANGE[1]))/1000)
  146. # do disconnect if established
  147. disconnect_method_choice = random.choice(self.disconnect_method)
  148. if self.utility.execute_tcp_method(disconnect_method_choice, connection) is False:
  149. self.log_error()
  150. # make sure target socket closed
  151. self.utility.close_connection(connection)
  152. time.sleep(float(random.randint(DELAY_RANGE[0], DELAY_RANGE[1]))/1000)
  153. pass
  154. # serial_condition: re string
  155. # tcp_condition: target local port
  156. def add_checkers(self, serial_condition=None, tcp_condition=None):
  157. # cleanup
  158. self.serial_result = None
  159. self.tcp_result = None
  160. self.serial_notify_event.clear()
  161. self.tcp_notify_event.clear()
  162. # serial_checker
  163. if serial_condition is not None:
  164. pattern = re.compile(serial_condition)
  165. self.serial_check_item_id = self.serial_checker.add_check_item(pattern, self.notifier)
  166. else:
  167. # set event so that serial check always pass
  168. self.serial_notify_event.set()
  169. if tcp_condition is not None:
  170. self.tcp_check_item_id = self.tcp_checker.add_check_item(tcp_condition, self.notifier)
  171. else:
  172. # set event so that tcp check always pass
  173. self.tcp_notify_event.set()
  174. # NativeLog.add_trace_info("[Debug] add check item %s, connection is %s" % (self.serial_check_item_id, self))
  175. pass
  176. def get_checker_results(self, timeout=5):
  177. time1 = time.time()
  178. while time.time() - time1 < timeout:
  179. # if one type of checker is not set, its event will be set in add_checkers
  180. if self.serial_notify_event.isSet() is True and self.tcp_notify_event.isSet() is True:
  181. break
  182. time.sleep(self.CHECK_FREQ)
  183. # do cleanup
  184. # NativeLog.add_trace_info("[Debug] remove check item %s, connection is %s" % (self.serial_check_item_id, self))
  185. self.data_cache = self.serial_checker.remove_check_item(self.serial_check_item_id)
  186. self.tcp_checker.remove_check_item(self.tcp_check_item_id)
  187. # self.serial_check_item_id = None
  188. # self.tcp_check_item_id = None
  189. return self.serial_result, self.tcp_result
  190. def notifier(self, typ, result):
  191. if typ == "serial":
  192. self.serial_notify_event.set()
  193. self.serial_result = result
  194. elif typ == "tcp":
  195. self.tcp_notify_event.set()
  196. self.tcp_result = result
  197. def exit(self):
  198. self.exit_event.set()
  199. pass
  200. class TestCase(PerformanceTCBase.PerformanceTCBase):
  201. def __init__(self, test_case, test_env, timeout=120, log_path=None):
  202. PerformanceTCBase.PerformanceTCBase.__init__(self, test_case, test_env,
  203. timeout=timeout, log_path=log_path)
  204. self.max_connection = 5
  205. self.execute_time = 120 # execute time default 120 minutes
  206. self.pc_ip = "pc_ip"
  207. self.target_ip = "target_ip"
  208. self.connect_method = ["C_01"]
  209. self.disconnect_method = ["D_05"]
  210. cmd_set = test_case["cmd set"]
  211. # load param from excel
  212. for i in range(1, len(cmd_set)):
  213. if cmd_set[i][0] != "dummy":
  214. cmd_string = "self." + cmd_set[i][0]
  215. exec cmd_string
  216. self.error_event = threading.Event()
  217. self.serial_lock = threading.Lock()
  218. pass
  219. def serial_reader(self):
  220. return self.serial_read_data("SSC1")
  221. def send_ssc_command(self, data):
  222. with self.serial_lock:
  223. time.sleep(0.05)
  224. self.serial_write_line("SSC1", data)
  225. def error_detected(self):
  226. self.error_event.set()
  227. def process(self):
  228. # parameters
  229. max_connection = self.max_connection
  230. execute_time = self.execute_time * 60
  231. pc_ip = self.get_parameter(self.pc_ip)
  232. target_ip = self.get_parameter(self.target_ip)
  233. connect_method = self.connect_method
  234. disconnect_method = self.disconnect_method
  235. server_port = random.randint(30000, 50000)
  236. # step 1, create TCP server on target and PC
  237. # create TCP server on target
  238. self.serial_write_line("SSC1", "soc -B -t TCP -p %s" % server_port)
  239. match = self.check_regular_expression("SSC1", re.compile("BIND:(\d+),OK"))
  240. if match is None:
  241. NativeLog.add_prompt_trace("Failed to create TCP server on target")
  242. return
  243. target_sock_id = match.group(1)
  244. self.serial_write_line("SSC1", "soc -L -s %s" % target_sock_id)
  245. if self.check_response("SSC1", "+LISTEN:%s,OK" % target_sock_id) is False:
  246. NativeLog.add_prompt_trace("Failed to create TCP server on target")
  247. return
  248. # create TCP server on PC
  249. try:
  250. server_sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
  251. server_sock.bind((pc_ip, server_port))
  252. server_sock.listen(5)
  253. except StandardError:
  254. NativeLog.add_prompt_trace("Failed to create TCP server on PC")
  255. return
  256. # step 2, create checker
  257. serial_port_checker = SerialPortChecker(self.serial_reader)
  258. tcp_server_checker = TCPServerChecker(server_sock)
  259. serial_port_checker.start()
  260. tcp_server_checker.start()
  261. # step 3, create 5 thread and do connection
  262. utility = TCPConnectionUtility.Utility(self, server_port, server_port, pc_ip, target_ip)
  263. work_thread = []
  264. for i in range(max_connection):
  265. t = ConnectionHandler(utility, serial_port_checker, tcp_server_checker,
  266. connect_method, disconnect_method, self)
  267. work_thread.append(t)
  268. t.start()
  269. # step 4, wait and exit
  270. self.error_event.wait(execute_time)
  271. # close all threads
  272. for t in work_thread:
  273. t.exit()
  274. t.join()
  275. serial_port_checker.exit()
  276. tcp_server_checker.exit()
  277. serial_port_checker.join()
  278. tcp_server_checker.join()
  279. if self.error_event.isSet() is False:
  280. # no error detected
  281. self.set_result("Succeed")
  282. pass
  283. def main():
  284. pass
  285. if __name__ == '__main__':
  286. main()