example_test.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. # Need Python 3 string formatting functions
  2. from __future__ import print_function
  3. from threading import Thread
  4. import ttfw_idf
  5. # Define tuple of strings to expect for each DUT.
  6. master_expect = ("CAN Master: Driver installed", "CAN Master: Driver uninstalled")
  7. slave_expect = ("CAN Slave: Driver installed", "CAN Slave: Driver uninstalled")
  8. listen_only_expect = ("CAN Listen Only: Driver installed", "CAN Listen Only: Driver uninstalled")
  9. def dut_thread_callback(**kwargs):
  10. # Parse keyword arguments
  11. dut = kwargs['dut'] # Get DUT from kwargs
  12. expected = kwargs['expected']
  13. result = kwargs['result'] # Get result[out] from kwargs. MUST be of mutable type e.g. list
  14. # Must reset again as flashing during start_app will reset multiple times, causing unexpected results
  15. dut.reset()
  16. for string in expected:
  17. dut.expect(string, 20)
  18. # Mark thread has run to completion without any exceptions
  19. result[0] = True
  20. @ttfw_idf.idf_example_test(env_tag='Example_CAN2')
  21. def test_can_network_example(env, extra_data):
  22. # Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig
  23. dut_master = env.get_dut("dut1", "examples/peripherals/can/can_network/can_network_master")
  24. dut_slave = env.get_dut("dut2", "examples/peripherals/can/can_network/can_network_slave")
  25. dut_listen_only = env.get_dut("dut3", "examples/peripherals/can/can_network/can_network_listen_only")
  26. # Flash app onto each DUT, each DUT is reset again at the start of each thread
  27. dut_master.start_app()
  28. dut_slave.start_app()
  29. dut_listen_only.start_app()
  30. # Create dict of keyword arguments for each dut
  31. results = [[False], [False], [False]]
  32. master_kwargs = {"dut": dut_master, "result": results[0], "expected": master_expect}
  33. slave_kwargs = {"dut": dut_slave, "result": results[1], "expected": slave_expect}
  34. listen_only_kwargs = {"dut": dut_listen_only, "result": results[2], "expected": listen_only_expect}
  35. # Create thread for each dut
  36. dut_master_thread = Thread(target=dut_thread_callback, name="Master Thread", kwargs=master_kwargs)
  37. dut_slave_thread = Thread(target=dut_thread_callback, name="Slave Thread", kwargs=slave_kwargs)
  38. dut_listen_only_thread = Thread(target=dut_thread_callback, name="Listen Only Thread", kwargs=listen_only_kwargs)
  39. # Start each thread
  40. dut_listen_only_thread.start()
  41. dut_master_thread.start()
  42. dut_slave_thread.start()
  43. # Wait for threads to complete
  44. dut_listen_only_thread.join()
  45. dut_master_thread.join()
  46. dut_slave_thread.join()
  47. # check each thread ran to completion
  48. for result in results:
  49. if result[0] is not True:
  50. raise Exception("One or more threads did not run successfully")
  51. if __name__ == '__main__':
  52. test_can_network_example()