example_test.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # SPDX-FileCopyrightText: 2016-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import logging
  4. import os
  5. import re
  6. from threading import Thread
  7. import ttfw_idf
  8. from tiny_test_fw import DUT
  9. LOG_LEVEL = logging.DEBUG
  10. LOGGER_NAME = 'modbus_test'
  11. # Allowed options for the test
  12. TEST_READ_MAX_ERR_COUNT = 3 # Maximum allowed read errors during initialization
  13. TEST_THREAD_JOIN_TIMEOUT = 60 # Test theread join timeout in seconds
  14. TEST_EXPECT_STR_TIMEOUT = 30 # Test expect timeout in seconds
  15. TEST_MASTER_TCP = 'mb_tcp_master'
  16. TEST_SLAVE_TCP = 'mb_tcp_slave'
  17. STACK_DEFAULT = 0
  18. STACK_IPV4 = 1
  19. STACK_IPV6 = 2
  20. STACK_INIT = 3
  21. STACK_CONNECT = 4
  22. STACK_START = 5
  23. STACK_PAR_OK = 6
  24. STACK_PAR_FAIL = 7
  25. STACK_DESTROY = 8
  26. pattern_dict_slave = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
  27. STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
  28. STACK_INIT: (r'.*I \(([0-9]+)\) MB_TCP_SLAVE_PORT: (Protocol stack initialized).'),
  29. STACK_CONNECT: (r'.*I\s\(([0-9]+)\) MB_TCP_SLAVE_PORT: Socket \(#[0-9]+\), accept client connection from address: '
  30. r'([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
  31. STACK_START: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Start modbus test).*'),
  32. STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: ([A-Z]+ [A-Z]+) \([a-zA-Z0-9_]+ us\),\s'
  33. r'ADDR:([0-9]+), TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
  34. STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) SLAVE_TEST: Response time exceeds configured [0-9]+ [ms], ignore packet.*'),
  35. STACK_DESTROY: (r'.*I\s\(([0-9]+)\) SLAVE_TEST: (Modbus controller destroyed).')}
  36. pattern_dict_master = {STACK_IPV4: (r'.*I \([0-9]+\) example_common: - IPv4 address: ([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}).*'),
  37. STACK_IPV6: (r'.*I \([0-9]+\) example_common: - IPv6 address: (([A-Fa-f0-9]{1,4}::?){1,7}[A-Fa-f0-9]{1,4}).*'),
  38. STACK_INIT: (r'.*I \(([0-9]+)\) MASTER_TEST: (Modbus master stack initialized).*'),
  39. STACK_CONNECT: (r'.*.*I\s\(([0-9]+)\) MB_TCP_MASTER_PORT: (Connected [0-9]+ slaves), start polling.*'),
  40. STACK_START: (r'.*I \(([0-9]+)\) MASTER_TEST: (Start modbus test).*'),
  41. STACK_PAR_OK: (r'.*I\s\(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+ ([a-zA-Z0-9_]+)'
  42. r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.]+ \(0x[a-zA-Z0-9]+\) read successful.*'),
  43. STACK_PAR_FAIL: (r'.*E \(([0-9]+)\) MASTER_TEST: Characteristic #[0-9]+\s\(([a-zA-Z0-9_]+)\)\s'
  44. r'read fail, err = [0-9]+ \([_A-Z]+\).*'),
  45. STACK_DESTROY: (r'.*I\s\(([0-9]+)\) MASTER_TEST: (Destroy master).*')}
  46. logger = logging.getLogger(LOGGER_NAME)
  47. class DutTestThread(Thread):
  48. """ Test thread class
  49. """
  50. def __init__(self, dut=None, name=None, ip_addr=None, expect=None):
  51. """ Initialize the thread parameters
  52. """
  53. self.tname = name
  54. self.dut = dut
  55. self.expected = expect
  56. self.data = None
  57. self.ip_addr = ip_addr
  58. self.test_finish = False
  59. self.param_fail_count = 0
  60. self.param_ok_count = 0
  61. self.test_stage = STACK_DEFAULT
  62. super(DutTestThread, self).__init__()
  63. def __enter__(self):
  64. logger.debug('Restart %s.', self.tname)
  65. # Reset DUT first
  66. self.dut.reset()
  67. # Capture output from the DUT
  68. self.dut.start_capture_raw_data(capture_id=self.dut.name)
  69. return self
  70. def __exit__(self, exc_type, exc_value, traceback):
  71. """ The exit method of context manager
  72. """
  73. if exc_type is not None or exc_value is not None:
  74. logger.info('Thread %s rised an exception type: %s, value: %s', self.tname, str(exc_type), str(exc_value))
  75. def run(self):
  76. """ The function implements thread functionality
  77. """
  78. # Initialize slave IP for master board
  79. if (self.ip_addr is not None):
  80. self.set_ip(0)
  81. # Check expected strings in the listing
  82. self.test_start(TEST_EXPECT_STR_TIMEOUT)
  83. # Check DUT exceptions
  84. dut_exceptions = self.dut.get_exceptions()
  85. if 'Guru Meditation Error:' in dut_exceptions:
  86. raise RuntimeError('%s generated an exception(s): %s\n' % (str(self.dut), dut_exceptions))
  87. # Mark thread has run to completion without any exceptions
  88. self.data = self.dut.stop_capture_raw_data(capture_id=self.dut.name)
  89. def set_ip(self, index=0):
  90. """ The method to send slave IP to master application
  91. """
  92. message = r'.*Waiting IP([0-9]{1,2}) from stdin.*'
  93. # Read all data from previous restart to get prompt correctly
  94. self.dut.read()
  95. result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
  96. if int(result[0]) != index:
  97. raise RuntimeError('Incorrect index of IP=%s for %s\n' % (result[0], str(self.dut)))
  98. # Use the same slave IP address for all characteristics during the test
  99. self.dut.write('IP0=' + self.ip_addr, '\n', False)
  100. self.dut.write('IP1=' + self.ip_addr, '\n', False)
  101. self.dut.write('IP2=' + self.ip_addr, '\n', False)
  102. logger.debug('Set IP address=%s for %s', self.ip_addr, self.tname)
  103. message = r'.*IP\([0-9]+\) = \[([0-9a-zA-Z\.\:]+)\] set from stdin.*'
  104. result = self.dut.expect(re.compile(message), timeout=TEST_EXPECT_STR_TIMEOUT)
  105. logger.debug('Thread %s initialized with slave IP=%s.', self.tname, self.ip_addr)
  106. def test_start(self, timeout_value):
  107. """ The method to initialize and handle test stages
  108. """
  109. def handle_get_ip4(data):
  110. """ Handle get_ip v4
  111. """
  112. logger.debug('%s[STACK_IPV4]: %s', self.tname, str(data))
  113. self.test_stage = STACK_IPV4
  114. def handle_get_ip6(data):
  115. """ Handle get_ip v6
  116. """
  117. logger.debug('%s[STACK_IPV6]: %s', self.tname, str(data))
  118. self.test_stage = STACK_IPV6
  119. def handle_init(data):
  120. """ Handle init
  121. """
  122. logger.debug('%s[STACK_INIT]: %s', self.tname, str(data))
  123. self.test_stage = STACK_INIT
  124. def handle_connect(data):
  125. """ Handle connect
  126. """
  127. logger.debug('%s[STACK_CONNECT]: %s', self.tname, str(data))
  128. self.test_stage = STACK_CONNECT
  129. def handle_test_start(data):
  130. """ Handle connect
  131. """
  132. logger.debug('%s[STACK_START]: %s', self.tname, str(data))
  133. self.test_stage = STACK_START
  134. def handle_par_ok(data):
  135. """ Handle parameter ok
  136. """
  137. logger.debug('%s[READ_PAR_OK]: %s', self.tname, str(data))
  138. if self.test_stage >= STACK_START:
  139. self.param_ok_count += 1
  140. self.test_stage = STACK_PAR_OK
  141. def handle_par_fail(data):
  142. """ Handle parameter fail
  143. """
  144. logger.debug('%s[READ_PAR_FAIL]: %s', self.tname, str(data))
  145. self.param_fail_count += 1
  146. self.test_stage = STACK_PAR_FAIL
  147. def handle_destroy(data):
  148. """ Handle destroy
  149. """
  150. logger.debug('%s[DESTROY]: %s', self.tname, str(data))
  151. self.test_stage = STACK_DESTROY
  152. self.test_finish = True
  153. while not self.test_finish:
  154. try:
  155. self.dut.expect_any((re.compile(self.expected[STACK_IPV4]), handle_get_ip4),
  156. (re.compile(self.expected[STACK_IPV6]), handle_get_ip6),
  157. (re.compile(self.expected[STACK_INIT]), handle_init),
  158. (re.compile(self.expected[STACK_CONNECT]), handle_connect),
  159. (re.compile(self.expected[STACK_START]), handle_test_start),
  160. (re.compile(self.expected[STACK_PAR_OK]), handle_par_ok),
  161. (re.compile(self.expected[STACK_PAR_FAIL]), handle_par_fail),
  162. (re.compile(self.expected[STACK_DESTROY]), handle_destroy),
  163. timeout=timeout_value)
  164. except DUT.ExpectTimeout:
  165. logger.debug('%s, expect timeout on stage #%d (%s seconds)', self.tname, self.test_stage, timeout_value)
  166. self.test_finish = True
  167. def test_check_mode(dut=None, mode_str=None, value=None):
  168. """ Check communication mode for dut
  169. """
  170. global logger
  171. try:
  172. opt = dut.app.get_sdkconfig()[mode_str]
  173. logger.debug('%s {%s} = %s.\n', str(dut), mode_str, opt)
  174. return value == opt
  175. except Exception:
  176. logger.error('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.', str(dut), mode_str)
  177. return False
  178. @ttfw_idf.idf_example_test(env_tag='Example_Modbus_TCP', target=['esp32'])
  179. def test_modbus_tcp_communication(env, comm_mode):
  180. global logger
  181. rel_project_path = os.path.join('examples', 'protocols', 'modbus', 'tcp')
  182. # Get device under test. Both duts must be able to be connected to WiFi router
  183. dut_master = env.get_dut('modbus_tcp_master', os.path.join(rel_project_path, TEST_MASTER_TCP))
  184. dut_slave = env.get_dut('modbus_tcp_slave', os.path.join(rel_project_path, TEST_SLAVE_TCP))
  185. log_file = os.path.join(env.log_path, 'modbus_tcp_test.log')
  186. print('Logging file name: %s' % log_file)
  187. try:
  188. # create file handler which logs even debug messages
  189. logger.setLevel(logging.DEBUG)
  190. fh = logging.FileHandler(log_file)
  191. fh.setLevel(logging.DEBUG)
  192. # set format of output for both handlers
  193. formatter = logging.Formatter('%(levelname)s:%(message)s')
  194. fh.setFormatter(formatter)
  195. logger.addHandler(fh)
  196. # create console handler
  197. ch = logging.StreamHandler()
  198. ch.setLevel(logging.INFO)
  199. # set format of output for both handlers
  200. formatter = logging.Formatter('%(levelname)s:%(message)s')
  201. ch.setFormatter(formatter)
  202. logger.addHandler(ch)
  203. # Check Kconfig configuration options for each built example
  204. if (test_check_mode(dut_master, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y') and
  205. test_check_mode(dut_slave, 'CONFIG_FMB_COMM_MODE_TCP_EN', 'y')):
  206. slave_name = TEST_SLAVE_TCP
  207. master_name = TEST_MASTER_TCP
  208. else:
  209. logger.error('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
  210. raise RuntimeError('ENV_TEST_FAILURE: IP resolver mode do not match in the master and slave implementation.\n')
  211. address = None
  212. if test_check_mode(dut_master, 'CONFIG_MB_SLAVE_IP_FROM_STDIN', 'y'):
  213. logger.info('ENV_TEST_INFO: Set slave IP address through STDIN.\n')
  214. # Flash app onto DUT (Todo: Debug case when the slave flashed before master then expect does not work correctly for no reason
  215. dut_slave.start_app()
  216. dut_master.start_app()
  217. if test_check_mode(dut_master, 'CONFIG_EXAMPLE_CONNECT_IPV6', 'y'):
  218. address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV6]), TEST_EXPECT_STR_TIMEOUT)
  219. else:
  220. address = dut_slave.expect(re.compile(pattern_dict_slave[STACK_IPV4]), TEST_EXPECT_STR_TIMEOUT)
  221. if address is not None:
  222. print('Found IP slave address: %s' % address[0])
  223. else:
  224. raise RuntimeError('ENV_TEST_FAILURE: Slave IP address is not found in the output. Check network settings.\n')
  225. else:
  226. raise RuntimeError('ENV_TEST_FAILURE: Slave IP resolver is not configured correctly.\n')
  227. # Create thread for each dut
  228. with DutTestThread(dut=dut_master, name=master_name, ip_addr=address[0], expect=pattern_dict_master) as dut_master_thread:
  229. with DutTestThread(dut=dut_slave, name=slave_name, ip_addr=None, expect=pattern_dict_slave) as dut_slave_thread:
  230. # Start each thread
  231. dut_slave_thread.start()
  232. dut_master_thread.start()
  233. # Wait for threads to complete
  234. dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
  235. dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
  236. if dut_slave_thread.is_alive():
  237. logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
  238. dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
  239. raise RuntimeError('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
  240. (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
  241. if dut_master_thread.is_alive():
  242. logger.error('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n',
  243. dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT)
  244. raise RuntimeError('TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
  245. (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
  246. logger.info('TEST_INFO: %s error count = %d, %s error count = %d.\n',
  247. dut_master_thread.tname, dut_master_thread.param_fail_count,
  248. dut_slave_thread.tname, dut_slave_thread.param_fail_count)
  249. logger.info('TEST_INFO: %s ok count = %d, %s ok count = %d.\n',
  250. dut_master_thread.tname, dut_master_thread.param_ok_count,
  251. dut_slave_thread.tname, dut_slave_thread.param_ok_count)
  252. if ((dut_master_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
  253. (dut_slave_thread.param_fail_count > TEST_READ_MAX_ERR_COUNT) or
  254. (dut_slave_thread.param_ok_count == 0) or
  255. (dut_master_thread.param_ok_count == 0)):
  256. raise RuntimeError('TEST_FAILURE: %s parameter read error(ok) count = %d(%d), %s parameter read error(ok) count = %d(%d).\n' %
  257. (dut_master_thread.tname, dut_master_thread.param_fail_count, dut_master_thread.param_ok_count,
  258. dut_slave_thread.tname, dut_slave_thread.param_fail_count, dut_slave_thread.param_ok_count))
  259. logger.info('TEST_SUCCESS: The Modbus parameter test is completed successfully.\n')
  260. finally:
  261. dut_master.close()
  262. dut_slave.close()
  263. logging.shutdown()
  264. if __name__ == '__main__':
  265. test_modbus_tcp_communication()