example_test.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # Need Python 3 string formatting functions
  2. from __future__ import print_function
  3. import os
  4. import sys
  5. from threading import Thread
  6. try:
  7. import IDF
  8. from IDF.IDFDUT import ESP32DUT
  9. except ImportError:
  10. # The test cause is dependent on the Tiny Test Framework. Ensure the
  11. # `TEST_FW_PATH` environment variable is set to `$IDF_PATH/tools/tiny-test-fw`
  12. test_fw_path = os.getenv("TEST_FW_PATH")
  13. if test_fw_path and test_fw_path not in sys.path:
  14. sys.path.insert(0, test_fw_path)
  15. import IDF
  16. # Define tuple of strings to expect for each DUT.
  17. master_expect = ("CAN Master: Driver installed", "CAN Master: Driver uninstalled")
  18. slave_expect = ("CAN Slave: Driver installed", "CAN Slave: Driver uninstalled")
  19. listen_only_expect = ("CAN Listen Only: Driver installed", "CAN Listen Only: Driver uninstalled")
  20. def dut_thread_callback(**kwargs):
  21. # Parse keyword arguments
  22. dut = kwargs['dut'] # Get DUT from kwargs
  23. expected = kwargs['expected']
  24. result = kwargs['result'] # Get result[out] from kwargs. MUST be of mutable type e.g. list
  25. # Must reset again as flashing during start_app will reset multiple times, causing unexpected results
  26. dut.reset()
  27. for string in expected:
  28. dut.expect(string, 20)
  29. # Mark thread has run to completion without any exceptions
  30. result[0] = True
  31. @IDF.idf_example_test(env_tag='Example_CAN2', ignore=True)
  32. def test_can_network_example(env, extra_data):
  33. # Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig
  34. dut_master = env.get_dut("dut1", "examples/peripherals/can/can_network/can_network_master", dut_class=ESP32DUT)
  35. dut_slave = env.get_dut("dut2", "examples/peripherals/can/can_network/can_network_slave", dut_class=ESP32DUT)
  36. dut_listen_only = env.get_dut("dut3", "examples/peripherals/can/can_network/can_network_listen_only", dut_class=ESP32DUT)
  37. # Flash app onto each DUT, each DUT is reset again at the start of each thread
  38. dut_master.start_app()
  39. dut_slave.start_app()
  40. dut_listen_only.start_app()
  41. # Create dict of keyword arguments for each dut
  42. results = [[False], [False], [False]]
  43. master_kwargs = {"dut": dut_master, "result": results[0], "expected": master_expect}
  44. slave_kwargs = {"dut": dut_slave, "result": results[1], "expected": slave_expect}
  45. listen_only_kwargs = {"dut": dut_listen_only, "result": results[2], "expected": listen_only_expect}
  46. # Create thread for each dut
  47. dut_master_thread = Thread(target=dut_thread_callback, name="Master Thread", kwargs=master_kwargs)
  48. dut_slave_thread = Thread(target=dut_thread_callback, name="Slave Thread", kwargs=slave_kwargs)
  49. dut_listen_only_thread = Thread(target=dut_thread_callback, name="Listen Only Thread", kwargs=listen_only_kwargs)
  50. # Start each thread
  51. dut_listen_only_thread.start()
  52. dut_master_thread.start()
  53. dut_slave_thread.start()
  54. # Wait for threads to complete
  55. dut_listen_only_thread.join()
  56. dut_master_thread.join()
  57. dut_slave_thread.join()
  58. # check each thread ran to completion
  59. for result in results:
  60. if result[0] is not True:
  61. raise Exception("One or more threads did not run successfully")
  62. if __name__ == '__main__':
  63. test_can_network_example()