example_test.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. #Need Python 3 string formatting functions
  2. from __future__ import print_function
  3. import re
  4. import os
  5. import sys
  6. import time
  7. from threading import Thread
  8. # The test cause is dependent on the Tiny Test Framework. Ensure the
  9. # `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
  10. test_fw_path = os.getenv("TEST_FW_PATH")
  11. if test_fw_path and test_fw_path not in sys.path:
  12. sys.path.insert(0, test_fw_path)
  13. import TinyFW
  14. import IDF
  15. #Define tuple of strings to expect for each DUT.
  16. master_expect = ("CAN Master: Driver installed", "CAN Master: Driver uninstalled")
  17. slave_expect = ("CAN Slave: Driver installed", "CAN Slave: Driver uninstalled")
  18. listen_only_expect = ("CAN Listen Only: Driver installed", "Listen Only: Driver uninstalled")
  19. def dut_thread_callback(**kwargs):
  20. #Parse keyword arguments
  21. dut = kwargs['dut'] #Get DUT from kwargs
  22. expected = kwargs['expected']
  23. result = kwargs['result'] #Get result[out] from kwargs. MUST be of mutable type e.g. list
  24. #Must reset again as flashing during start_app will reset multiple times, causing unexpected results
  25. dut.reset()
  26. for string in expected:
  27. dut.expect(string, 20)
  28. #Mark thread has run to completion without any exceptions
  29. result[0] = True
  30. @IDF.idf_example_test(env_tag='Example_CAN')
  31. def test_can_network_example(env, extra_data):
  32. #Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig
  33. dut_master = env.get_dut("dut1", "examples/peripherals/can/can_network/can_network_master")
  34. dut_slave = env.get_dut("dut2", "examples/peripherals/can/can_network/can_network_slave")
  35. dut_listen_only = env.get_dut("dut3", "examples/peripherals/can/can_network/can_network_listen_only")
  36. #Flash app onto each DUT, each DUT is reset again at the start of each thread
  37. dut_master.start_app()
  38. dut_slave.start_app()
  39. dut_listen_only.start_app()
  40. #Create dict of keyword arguments for each dut
  41. results = [[False], [False], [False]]
  42. master_kwargs = {"dut" : dut_master, "result" : results[0], "expected" : master_expect}
  43. slave_kwargs = {"dut" : dut_slave, "result" : results[1], "expected" : slave_expect}
  44. listen_only_kwargs = {"dut" : dut_listen_only, "result" : results[2], "expected" : listen_only_expect}
  45. #Create thread for each dut
  46. dut_master_thread = Thread(target = dut_thread_callback, name = "Master Thread", kwargs = master_kwargs)
  47. dut_slave_thread = Thread(target = dut_thread_callback, name = "Slave Thread", kwargs = slave_kwargs)
  48. dut_listen_only_thread = Thread(target = dut_thread_callback, name = "Listen Only Thread", kwargs = listen_only_kwargs)
  49. #Start each thread
  50. dut_listen_only_thread.start()
  51. dut_master_thread.start()
  52. dut_slave_thread.start()
  53. #Wait for threads to complete
  54. dut_listen_only_thread.join()
  55. dut_master_thread.join()
  56. dut_slave_thread.join()
  57. #check each thread ran to completion
  58. for result in results:
  59. if result[0] != True:
  60. raise Exception("One or more threads did not run successfully")
  61. if __name__ == '__main__':
  62. test_can_network_example()