example_test.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # SPDX-FileCopyrightText: 2016-2021 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. # Need Python 3 string formatting functions
  4. from __future__ import print_function
  5. import logging
  6. import os
  7. import re
  8. from threading import Thread
  9. import ttfw_idf
  10. LOG_LEVEL = logging.DEBUG
  11. LOGGER_NAME = 'modbus_test'
  12. # Allowed parameter reads
  13. TEST_READ_MIN_COUNT = 10 # Minimum number of correct readings
  14. TEST_READ_MAX_ERR_COUNT = 2 # Maximum allowed read errors during initialization
  15. TEST_THREAD_EXPECT_TIMEOUT = 120 # Test theread expect timeout in seconds
  16. TEST_THREAD_JOIN_TIMEOUT = 180 # Test theread join timeout in seconds
  17. # Test definitions
  18. TEST_MASTER_RTU = 'master_rtu'
  19. TEST_SLAVE_RTU = 'slave_rtu'
  20. TEST_MASTER_ASCII = 'master_ascii'
  21. TEST_SLAVE_ASCII = 'slave_ascii'
  22. # Define tuple of strings to expect for each DUT.
  23. #
  24. master_expect = ('MASTER_TEST: Modbus master stack initialized...', 'MASTER_TEST: Start modbus test...', 'MASTER_TEST: Destroy master...')
  25. slave_expect = ('SLAVE_TEST: Modbus slave stack initialized.', 'SLAVE_TEST: Start modbus test...', 'SLAVE_TEST: Modbus controller destroyed.')
  26. # The dictionary for expected values in listing
  27. expect_dict_master_ok = {'START': (),
  28. 'READ_PAR_OK': (),
  29. 'ALARM_MSG': (u'7',)}
  30. expect_dict_master_err = {'READ_PAR_ERR': (u'263', u'ESP_ERR_TIMEOUT'),
  31. 'READ_STK_ERR': (u'107', u'ESP_ERR_TIMEOUT')}
  32. # The dictionary for regular expression patterns to check in listing
  33. pattern_dict_master_ok = {'START': (r'.*I \([0-9]+\) MASTER_TEST: Start modbus test...'),
  34. 'READ_PAR_OK': (r'.*I\s\([0-9]+\) MASTER_TEST: Characteristic #[0-9]+ [a-zA-Z0-9_]+'
  35. r'\s\([a-zA-Z\%\/]+\) value = [a-zA-Z0-9\.\s]*\(0x[a-zA-Z0-9]+\) read successful.'),
  36. 'ALARM_MSG': (r'.*I \([0-9]*\) MASTER_TEST: Alarm triggered by cid #([0-9]+).')}
  37. pattern_dict_master_err = {'READ_PAR_ERR_TOUT': (r'.*E \([0-9]+\) MASTER_TEST: Characteristic #[0-9]+'
  38. r'\s\([a-zA-Z0-9_]+\) read fail, err = [0-9]+ \([_A-Z]+\).'),
  39. 'READ_STK_ERR_TOUT': (r'.*E \([0-9]+\) MB_CONTROLLER_MASTER: [a-zA-Z0-9_]+\([0-9]+\):\s'
  40. r'SERIAL master get parameter failure error=\(0x([a-zA-Z0-9]+)\) \(([_A-Z]+)\).')}
  41. # The dictionary for expected values in listing
  42. expect_dict_slave_ok = {'START': (),
  43. 'READ_PAR_OK': (),
  44. 'DESTROY': ()}
  45. # The dictionary for regular expression patterns to check in listing
  46. pattern_dict_slave_ok = {'START': (r'.*I \([0-9]+\) SLAVE_TEST: Start modbus test...'),
  47. 'READ_PAR_OK': (r'.*I\s\([0-9]+\) SLAVE_TEST: [A-Z]+ READ \([a-zA-Z0-9_]+ us\),\s'
  48. r'ADDR:[0-9]+, TYPE:[0-9]+, INST_ADDR:0x[a-zA-Z0-9]+, SIZE:[0-9]+'),
  49. 'DESTROY': (r'.*I\s\([0-9]+\) SLAVE_TEST: Modbus controller destroyed.')}
  50. logger = logging.getLogger(LOGGER_NAME)
  51. class DutTestThread(Thread):
  52. def __init__(self, dut=None, name=None, expect=None):
  53. """ Initialize the thread parameters
  54. """
  55. self.tname = name
  56. self.dut = dut
  57. self.expected = expect
  58. self.result = False
  59. self.data = None
  60. super(DutTestThread, self).__init__()
  61. def run(self):
  62. """ The function implements thread functionality
  63. """
  64. # Must reset again as flashing during start_app will reset multiple times, causing unexpected results
  65. self.dut.reset()
  66. # Capture output from the DUT
  67. self.dut.start_capture_raw_data()
  68. # Check expected strings in the listing
  69. for string in self.expected:
  70. self.dut.expect(string, TEST_THREAD_EXPECT_TIMEOUT)
  71. # Check DUT exceptions
  72. dut_exceptions = self.dut.get_exceptions()
  73. if 'Guru Meditation Error:' in dut_exceptions:
  74. raise Exception('%s generated an exception: %s\n' % (str(self.dut), dut_exceptions))
  75. # Mark thread has run to completion without any exceptions
  76. self.data = self.dut.stop_capture_raw_data()
  77. self.result = True
  78. def test_filter_output(data=None, start_pattern=None, end_pattern=None):
  79. """Use patters to filter output
  80. """
  81. start_index = str(data).find(start_pattern)
  82. end_index = str(data).find(end_pattern)
  83. logger.debug('Listing start index= %d, end=%d' % (start_index, end_index))
  84. if start_index == -1 or end_index == -1:
  85. return data
  86. return data[start_index:end_index + len(end_pattern)]
  87. def test_expect_re(data, pattern):
  88. """
  89. Check if re pattern is matched in data cache
  90. :param data: data to process
  91. :param pattern: compiled RegEx pattern
  92. :return: match groups if match succeed otherwise None
  93. """
  94. ret = None
  95. if isinstance(pattern, type(u'')):
  96. pattern = pattern.encode('utf-8')
  97. regex = re.compile(pattern)
  98. if isinstance(data, type(u'')):
  99. data = data.encode('utf-8')
  100. match = regex.search(data)
  101. if match:
  102. ret = tuple(None if x is None else x.decode() for x in match.groups())
  103. index = match.end()
  104. else:
  105. index = None
  106. return ret, index
  107. def test_check_output(data=None, check_dict=None, expect_dict=None):
  108. """ Check output for the test
  109. Check log using regular expressions:
  110. """
  111. global logger
  112. match_count = 0
  113. index = 0
  114. data_lines = data.splitlines()
  115. for key, pattern in check_dict.items():
  116. if key not in expect_dict:
  117. break
  118. # Check pattern in the each line
  119. for line in data_lines:
  120. group, index = test_expect_re(line, pattern)
  121. if index is not None:
  122. logger.debug('Found key{%s}=%s, line: \n%s' % (key, group, line))
  123. if expect_dict[key] == group:
  124. logger.debug('The result is correct for the key:%s, expected:%s == returned:%s' % (key, str(expect_dict[key]), str(group)))
  125. match_count += 1
  126. return match_count
  127. def test_check_mode(dut=None, mode_str=None, value=None):
  128. """ Check communication mode for dut
  129. """
  130. global logger
  131. try:
  132. opt = dut.app.get_sdkconfig()[mode_str]
  133. logger.info('%s {%s} = %s.\n' % (str(dut), mode_str, opt))
  134. return value == opt
  135. except Exception:
  136. logger.info('ENV_TEST_FAILURE: %s: Cannot find option %s in sdkconfig.' % (str(dut), mode_str))
  137. return False
  138. @ttfw_idf.idf_example_test(env_tag='Example_T2_RS485', target=['esp32'])
  139. def test_modbus_serial_communication(env, comm_mode):
  140. global logger
  141. # Get device under test. "dut1 - master", "dut2 - slave" must be properly connected through RS485 interface driver
  142. dut_master = env.get_dut('modbus_master', 'examples/protocols/modbus/serial/mb_master', dut_class=ttfw_idf.ESP32DUT)
  143. dut_slave = env.get_dut('modbus_slave', 'examples/protocols/modbus/serial/mb_slave', dut_class=ttfw_idf.ESP32DUT)
  144. try:
  145. logger.debug('Environment vars: %s\r\n' % os.environ)
  146. logger.debug('DUT slave sdkconfig: %s\r\n' % dut_slave.app.get_sdkconfig())
  147. logger.debug('DUT master sdkconfig: %s\r\n' % dut_master.app.get_sdkconfig())
  148. # Check Kconfig configuration options for each built example
  149. if test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_ASCII', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_ASCII', 'y'):
  150. logger.info('ENV_TEST_INFO: Modbus ASCII test mode selected in the configuration. \n')
  151. slave_name = TEST_SLAVE_ASCII
  152. master_name = TEST_MASTER_ASCII
  153. elif test_check_mode(dut_master, 'CONFIG_MB_COMM_MODE_RTU', 'y') and test_check_mode(dut_slave, 'CONFIG_MB_COMM_MODE_RTU', 'y'):
  154. logger.info('ENV_TEST_INFO: Modbus RTU test mode selected in the configuration. \n')
  155. slave_name = TEST_SLAVE_RTU
  156. master_name = TEST_MASTER_RTU
  157. else:
  158. logger.error("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
  159. raise Exception("ENV_TEST_FAILURE: Communication mode in master and slave configuration don't match.\n")
  160. # Check if slave address for example application is default one to be able to communicate
  161. if not test_check_mode(dut_slave, 'CONFIG_MB_SLAVE_ADDR', '1'):
  162. logger.error('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
  163. raise Exception('ENV_TEST_FAILURE: Slave address option is incorrect.\n')
  164. # Flash app onto each DUT
  165. dut_master.start_app()
  166. dut_slave.start_app()
  167. # Create thread for each dut
  168. dut_master_thread = DutTestThread(dut=dut_master, name=master_name, expect=master_expect)
  169. dut_slave_thread = DutTestThread(dut=dut_slave, name=slave_name, expect=slave_expect)
  170. # Start each thread
  171. dut_slave_thread.start()
  172. dut_master_thread.start()
  173. # Wait for threads to complete
  174. dut_slave_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
  175. dut_master_thread.join(timeout=TEST_THREAD_JOIN_TIMEOUT)
  176. if dut_slave_thread.isAlive():
  177. logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
  178. (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
  179. raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
  180. (dut_slave_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
  181. if dut_master_thread.isAlive():
  182. logger.error('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
  183. (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
  184. raise Exception('ENV_TEST_FAILURE: The thread %s is not completed successfully after %d seconds.\n' %
  185. (dut_master_thread.tname, TEST_THREAD_JOIN_TIMEOUT))
  186. finally:
  187. dut_master.close()
  188. dut_slave.close()
  189. # Check if test threads completed successfully and captured data
  190. if not dut_slave_thread.result or dut_slave_thread.data is None:
  191. logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
  192. raise Exception('The thread %s was not run successfully.' % dut_slave_thread.tname)
  193. if not dut_master_thread.result or dut_master_thread.data is None:
  194. logger.error('The thread %s was not run successfully.' % dut_slave_thread.tname)
  195. raise Exception('The thread %s was not run successfully.' % dut_master_thread.tname)
  196. # Filter output to get test messages
  197. master_output = test_filter_output(dut_master_thread.data, master_expect[0], master_expect[len(master_expect) - 1])
  198. if master_output is not None:
  199. logger.info('The data for master thread is captured.')
  200. logger.debug(master_output)
  201. slave_output = test_filter_output(dut_slave_thread.data, slave_expect[0], slave_expect[len(slave_expect) - 1])
  202. if slave_output is not None:
  203. logger.info('The data for slave thread is captured.')
  204. logger.debug(slave_output)
  205. # Check if parameters are read correctly by master
  206. match_count = test_check_output(master_output, pattern_dict_master_ok, expect_dict_master_ok)
  207. if match_count < TEST_READ_MIN_COUNT:
  208. logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
  209. raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
  210. logger.info('OK pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
  211. # If the test completed successfully (alarm triggered) but there are some errors during reading of parameters
  212. match_count = test_check_output(master_output, pattern_dict_master_err, expect_dict_master_err)
  213. if match_count > TEST_READ_MAX_ERR_COUNT:
  214. logger.error('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
  215. raise Exception('There are errors reading parameters from %s, %d' % (dut_master_thread.tname, match_count))
  216. logger.info('ERROR pattern test for %s, match_count=%d.' % (dut_master_thread.tname, match_count))
  217. match_count = test_check_output(slave_output, pattern_dict_slave_ok, expect_dict_slave_ok)
  218. if match_count < TEST_READ_MIN_COUNT:
  219. logger.error('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
  220. raise Exception('There are errors reading parameters from %s, %d' % (dut_slave_thread.tname, match_count))
  221. logger.info('OK pattern test for %s, match_count=%d.' % (dut_slave_thread.tname, match_count))
  222. if __name__ == '__main__':
  223. logger = logging.getLogger(LOGGER_NAME)
  224. # create file handler which logs even debug messages
  225. fh = logging.FileHandler('modbus_test.log')
  226. fh.setLevel(logging.DEBUG)
  227. logger.setLevel(logging.DEBUG)
  228. # create console handler
  229. ch = logging.StreamHandler()
  230. ch.setLevel(logging.INFO)
  231. # set format of output for both handlers
  232. formatter = logging.Formatter('%(levelname)s:%(message)s')
  233. ch.setFormatter(formatter)
  234. fh.setFormatter(formatter)
  235. logger.addHandler(fh)
  236. logger.addHandler(ch)
  237. logger.info('Start script %s.' % os.path.basename(__file__))
  238. test_modbus_serial_communication()
  239. logging.shutdown()