example_test.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. # Need Python 3 string formatting functions
  2. from __future__ import print_function
  3. from threading import Thread
  4. import ttfw_idf
  5. # Define tuple of strings to expect for each DUT.
  6. master_expect = ("CAN Master: Driver installed", "CAN Master: Driver uninstalled")
  7. slave_expect = ("CAN Slave: Driver installed", "CAN Slave: Driver uninstalled")
  8. listen_only_expect = ("CAN Listen Only: Driver installed", "CAN Listen Only: Driver uninstalled")
  9. def dut_thread_callback(**kwargs):
  10. # Parse keyword arguments
  11. dut = kwargs['dut'] # Get DUT from kwargs
  12. expected = kwargs['expected']
  13. result = kwargs['result'] # Get result[out] from kwargs. MUST be of mutable type e.g. list
  14. # Must reset again as flashing during start_app will reset multiple times, causing unexpected results
  15. dut.reset()
  16. for string in expected:
  17. dut.expect(string, 20)
  18. # Mark thread has run to completion without any exceptions
  19. result[0] = True
  20. @ttfw_idf.idf_example_test(env_tag='Example_CAN2', ignore=True)
  21. def test_can_network_example(env, extra_data):
  22. # Get device under test. "dut1", "dut2", and "dut3" must be properly defined in EnvConfig
  23. dut_master = env.get_dut("dut1", "examples/peripherals/can/can_network/can_network_master",
  24. dut_class=ttfw_idf.ESP32DUT)
  25. dut_slave = env.get_dut("dut2", "examples/peripherals/can/can_network/can_network_slave",
  26. dut_class=ttfw_idf.ESP32DUT)
  27. dut_listen_only = env.get_dut("dut3", "examples/peripherals/can/can_network/can_network_listen_only",
  28. dut_class=ttfw_idf.ESP32DUT)
  29. # Flash app onto each DUT, each DUT is reset again at the start of each thread
  30. dut_master.start_app()
  31. dut_slave.start_app()
  32. dut_listen_only.start_app()
  33. # Create dict of keyword arguments for each dut
  34. results = [[False], [False], [False]]
  35. master_kwargs = {"dut": dut_master, "result": results[0], "expected": master_expect}
  36. slave_kwargs = {"dut": dut_slave, "result": results[1], "expected": slave_expect}
  37. listen_only_kwargs = {"dut": dut_listen_only, "result": results[2], "expected": listen_only_expect}
  38. # Create thread for each dut
  39. dut_master_thread = Thread(target=dut_thread_callback, name="Master Thread", kwargs=master_kwargs)
  40. dut_slave_thread = Thread(target=dut_thread_callback, name="Slave Thread", kwargs=slave_kwargs)
  41. dut_listen_only_thread = Thread(target=dut_thread_callback, name="Listen Only Thread", kwargs=listen_only_kwargs)
  42. # Start each thread
  43. dut_listen_only_thread.start()
  44. dut_master_thread.start()
  45. dut_slave_thread.start()
  46. # Wait for threads to complete
  47. dut_listen_only_thread.join()
  48. dut_master_thread.join()
  49. dut_slave_thread.join()
  50. # check each thread ran to completion
  51. for result in results:
  52. if result[0] is not True:
  53. raise Exception("One or more threads did not run successfully")
  54. if __name__ == '__main__':
  55. test_can_network_example()