TCPConnection.py 12 KB


  1. import random
  2. import re
  3. import socket
  4. import threading
  5. import time
  6. import TCPConnectionUtility
  7. from NativeLog import NativeLog
  8. from TCAction import PerformanceTCBase
  9. DELAY_RANGE = [10, 3000]
  10. CONNECTION_STRUCTURE = ("Connection handler", "PC socket", "Target socket id",
  11. "Target port", "PC port", "PC state", "Target state")
  12. # max fail count for one connection during test
  13. MAX_FAIL_COUNT = 10
  14. class CheckerBase(threading.Thread):
  15. CHECK_ITEM = ("CONDITION", "NOTIFIER", "ID", "DATA")
  16. SLEEP_TIME = 0.1 # sleep 100ms between each check action
  17. def __init__(self):
  18. threading.Thread.__init__(self)
  19. self.setDaemon(True)
  20. self.exit_event = threading.Event()
  21. self.sync_lock = threading.Lock()
  22. self.check_item_list = []
  23. self.check_item_id = 0
  24. def run(self):
  25. while self.exit_event.isSet() is False:
  26. self.process()
  27. pass
  28. def process(self):
  29. pass
  30. def add_check_item(self, condition, notifier):
  31. with self.sync_lock:
  32. check_item_id = self.check_item_id
  33. self.check_item_id += 1
  34. self.check_item_list.append(dict(zip(self.CHECK_ITEM, (condition, notifier, check_item_id, str()))))
  35. return check_item_id
  36. def remove_check_item(self, check_item_id):
  37. ret = None
  38. with self.sync_lock:
  39. check_items = filter(lambda x: x["ID"] == check_item_id, self.check_item_list)
  40. if len(check_items) > 0:
  41. self.check_item_list.remove(check_items[0])
  42. ret = check_items[0]["DATA"]
  43. return ret
  44. def exit(self):
  45. self.exit_event.set()
  46. pass
  47. # check on serial port
  48. class SerialPortChecker(CheckerBase):
  49. def __init__(self, serial_reader):
  50. CheckerBase.__init__(self)
  51. self.serial_reader = serial_reader
  52. pass
  53. # check condition for serial is compiled regular expression pattern
  54. @staticmethod
  55. def do_check(check_item, data):
  56. match = check_item["CONDITION"].search(data)
  57. if match is not None:
  58. pos = data.find(match.group()) + len(match.group())
  59. # notify user
  60. check_item["NOTIFIER"]("serial", match)
  61. else:
  62. pos = -1
  63. return pos
  64. def process(self):
  65. # do check
  66. with self.sync_lock:
  67. # read data
  68. new_data = self.serial_reader()
  69. # NativeLog.add_trace_info("[debug][read data] %s" % new_data)
  70. # do check each item
  71. for check_item in self.check_item_list:
  72. # NativeLog.add_trace_info("[debug][read data][ID][%s]" % check_item["ID"])
  73. check_item["DATA"] += new_data
  74. self.do_check(check_item, check_item["DATA"])
  75. time.sleep(self.SLEEP_TIME)
  76. # handle PC TCP server accept and notify user
  77. class TCPServerChecker(CheckerBase):
  78. def __init__(self, server_sock):
  79. CheckerBase.__init__(self)
  80. self.server_sock = server_sock
  81. server_sock.settimeout(self.SLEEP_TIME)
  82. self.accepted_socket_list = []
  83. # check condition for tcp accepted sock is tcp source port
  84. @staticmethod
  85. def do_check(check_item, data):
  86. for sock_addr_pair in data:
  87. addr = sock_addr_pair[1]
  88. if addr[1] == check_item["CONDITION"]:
  89. # same port, so this is the socket that matched, notify and remove it from list
  90. check_item["NOTIFIER"]("tcp", sock_addr_pair[0])
  91. data.remove(sock_addr_pair)
  92. def process(self):
  93. # do accept
  94. try:
  95. client_sock, addr = self.server_sock.accept()
  96. self.accepted_socket_list.append((client_sock, addr))
  97. except socket.error:
  98. pass
  99. # do check
  100. with self.sync_lock:
  101. check_item_list = self.check_item_list
  102. for check_item in check_item_list:
  103. self.do_check(check_item, self.accepted_socket_list)
  104. pass
  105. # this thread handles one tcp connection.
  106. class ConnectionHandler(threading.Thread):
  107. CHECK_FREQ = CheckerBase.SLEEP_TIME/2
  108. def __init__(self, utility, serial_checker, tcp_checker, connect_method, disconnect_method, test_case):
  109. threading.Thread.__init__(self)
  110. self.setDaemon(True)
  111. self.utility = utility
  112. self.connect_method = connect_method
  113. self.disconnect_method = disconnect_method
  114. self.exit_event = threading.Event()
  115. # following members are used in communication with checker threads
  116. self.serial_checker = serial_checker
  117. self.tcp_checker = tcp_checker
  118. self.serial_notify_event = threading.Event()
  119. self.tcp_notify_event = threading.Event()
  120. self.serial_result = None
  121. self.tcp_result = None
  122. self.serial_check_item_id = None
  123. self.tcp_check_item_id = None
  124. self.data_cache = None
  125. self.fail_count = 0
  126. self.test_case = test_case
  127. pass
  128. def log_error(self):
  129. self.fail_count += 1
  130. if self.fail_count > MAX_FAIL_COUNT:
  131. self.test_case.error_detected()
  132. def new_connection_structure(self):
  133. connection = dict.fromkeys(CONNECTION_STRUCTURE, None)
  134. connection["Connection handler"] = self
  135. return connection
  136. def run(self):
  137. while self.exit_event.isSet() is False:
  138. connection = self.new_connection_structure()
  139. # do connect
  140. connect_method_choice = random.choice(self.connect_method)
  141. if self.utility.execute_tcp_method(connect_method_choice, connection) is False:
  142. self.log_error()
  143. # check if established
  144. if self.utility.is_established_state(connection) is True:
  145. time.sleep(float(random.randint(DELAY_RANGE[0], DELAY_RANGE[1]))/1000)
  146. # do disconnect if established
  147. disconnect_method_choice = random.choice(self.disconnect_method)
  148. if self.utility.execute_tcp_method(disconnect_method_choice, connection) is False:
  149. self.log_error()
  150. # make sure target socket closed
  151. self.utility.close_connection(connection)
  152. time.sleep(float(random.randint(DELAY_RANGE[0], DELAY_RANGE[1]))/1000)
  153. pass
  154. # serial_condition: re string
  155. # tcp_condition: target local port
  156. def add_checkers(self, serial_condition=None, tcp_condition=None):
  157. # cleanup
  158. self.serial_result = None
  159. self.tcp_result = None
  160. self.serial_notify_event.clear()
  161. self.tcp_notify_event.clear()
  162. # serial_checker
  163. if serial_condition is not None:
  164. pattern = re.compile(serial_condition)
  165. self.serial_check_item_id = self.serial_checker.add_check_item(pattern, self.notifier)
  166. else:
  167. # set event so that serial check always pass
  168. self.serial_notify_event.set()
  169. if tcp_condition is not None:
  170. self.tcp_check_item_id = self.tcp_checker.add_check_item(tcp_condition, self.notifier)
  171. else:
  172. # set event so that tcp check always pass
  173. self.tcp_notify_event.set()
  174. # NativeLog.add_trace_info("[Debug] add check item %s, connection is %s" % (self.serial_check_item_id, self))
  175. pass
  176. def get_checker_results(self, timeout=5):
  177. time1 = time.time()
  178. while time.time() - time1 < timeout:
  179. # if one type of checker is not set, its event will be set in add_checkers
  180. if self.serial_notify_event.isSet() is True and self.tcp_notify_event.isSet() is True:
  181. break
  182. time.sleep(self.CHECK_FREQ)
  183. # do cleanup
  184. # NativeLog.add_trace_info("[Debug] remove check item %s, connection is %s" % (self.serial_check_item_id, self))
  185. self.data_cache = self.serial_checker.remove_check_item(self.serial_check_item_id)
  186. self.tcp_checker.remove_check_item(self.tcp_check_item_id)
  187. # self.serial_check_item_id = None
  188. # self.tcp_check_item_id = None
  189. return self.serial_result, self.tcp_result
  190. def notifier(self, typ, result):
  191. if typ == "serial":
  192. self.serial_notify_event.set()
  193. self.serial_result = result
  194. elif typ == "tcp":
  195. self.tcp_notify_event.set()
  196. self.tcp_result = result
  197. def exit(self):
  198. self.exit_event.set()
  199. pass
  200. class TestCase(PerformanceTCBase.PerformanceTCBase):
  201. def __init__(self, test_case, test_env, timeout=120, log_path=None):
  202. PerformanceTCBase.PerformanceTCBase.__init__(self, test_case, test_env,
  203. timeout=timeout, log_path=log_path)
  204. self.max_connection = 5
  205. self.execute_time = 120 # execute time default 120 minutes
  206. self.pc_ip = "pc_ip"
  207. self.target_ip = "target_ip"
  208. self.connect_method = ["C_01"]
  209. self.disconnect_method = ["D_05"]
  210. cmd_set = test_case["cmd set"]
  211. # load param from excel
  212. for i in range(1, len(cmd_set)):
  213. if cmd_set[i][0] != "dummy":
  214. cmd_string = "self." + cmd_set[i][0]
  215. exec cmd_string
  216. self.error_event = threading.Event()
  217. self.serial_lock = threading.Lock()
  218. pass
  219. def serial_reader(self):
  220. return self.serial_read_data("SSC1")
  221. def send_ssc_command(self, data):
  222. with self.serial_lock:
  223. time.sleep(0.05)
  224. self.serial_write_line("SSC1", data)
  225. def error_detected(self):
  226. self.error_event.set()
  227. def process(self):
  228. # parameters
  229. max_connection = self.max_connection
  230. execute_time = self.execute_time * 60
  231. pc_ip = self.get_parameter(self.pc_ip)
  232. target_ip = self.get_parameter(self.target_ip)
  233. connect_method = self.connect_method
  234. disconnect_method = self.disconnect_method
  235. server_port = random.randint(30000, 50000)
  236. # step 1, create TCP server on target and PC
  237. # create TCP server on target
  238. self.serial_write_line("SSC1", "soc -B -t TCP -p %s" % server_port)
  239. match = self.check_regular_expression("SSC1", re.compile("BIND:(\d+),OK"))
  240. if match is None:
  241. NativeLog.add_prompt_trace("Failed to create TCP server on target")
  242. return
  243. target_sock_id = match.group(1)
  244. self.serial_write_line("SSC1", "soc -L -s %s" % target_sock_id)
  245. if self.check_response("SSC1", "+LISTEN:%s,OK" % target_sock_id) is False:
  246. NativeLog.add_prompt_trace("Failed to create TCP server on target")
  247. return
  248. # create TCP server on PC
  249. try:
  250. server_sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
  251. server_sock.bind((pc_ip, server_port))
  252. server_sock.listen(5)
  253. except StandardError:
  254. NativeLog.add_prompt_trace("Failed to create TCP server on PC")
  255. return
  256. # step 2, create checker
  257. serial_port_checker = SerialPortChecker(self.serial_reader)
  258. tcp_server_checker = TCPServerChecker(server_sock)
  259. serial_port_checker.start()
  260. tcp_server_checker.start()
  261. # step 3, create 5 thread and do connection
  262. utility = TCPConnectionUtility.Utility(self, server_port, server_port, pc_ip, target_ip)
  263. work_thread = []
  264. for i in range(max_connection):
  265. t = ConnectionHandler(utility, serial_port_checker, tcp_server_checker,
  266. connect_method, disconnect_method, self)
  267. work_thread.append(t)
  268. t.start()
  269. # step 4, wait and exit
  270. self.error_event.wait(execute_time)
  271. # close all threads
  272. for t in work_thread:
  273. t.exit()
  274. t.join()
  275. serial_port_checker.exit()
  276. tcp_server_checker.exit()
  277. serial_port_checker.join()
  278. tcp_server_checker.join()
  279. if self.error_event.isSet() is False:
  280. # no error detected
  281. self.set_result("Succeed")
  282. pass
  283. def main():
  284. pass
  285. if __name__ == '__main__':
  286. main()