example_test.py 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from __future__ import print_function
  2. import ttfw_idf
  3. # Timer events
  4. TIMER_EVENT_LIMIT = 3
  5. TIMER_EXPIRY_HANDLING = "TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_expiry_handler, executed {} out of " + str(TIMER_EVENT_LIMIT) + " times"
  6. # Task events
  7. TASK_ITERATION_LIMIT = 5
  8. TASK_UNREGISTRATION_LIMIT = 3
  9. TASK_ITERATION_POST = "TASK_EVENTS:TASK_ITERATION_EVENT: posting to default loop, {} out of " + str(TASK_ITERATION_LIMIT)
  10. TASK_ITERATION_HANDLING = "TASK_EVENTS:TASK_ITERATION_EVENT: task_iteration_handler, executed {} times"
  11. def _test_timer_events(dut):
  12. dut.start_app()
  13. print("Checking timer events posting and handling")
  14. dut.expect("setting up")
  15. dut.expect("starting event sources")
  16. print("Finished setup")
  17. dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: posting to default loop")
  18. print("Posted timer started event")
  19. dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: timer_started_handler")
  20. dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: timer_any_handler")
  21. dut.expect("TIMER_EVENTS:TIMER_EVENT_STARTED: all_event_handler")
  22. print("Handled timer started event")
  23. for expiries in range(1, TIMER_EVENT_LIMIT + 1):
  24. dut.expect("TIMER_EVENTS:TIMER_EVENT_EXPIRY: posting to default loop")
  25. print("Posted timer expiry event {} out of {}".format(expiries, TIMER_EVENT_LIMIT))
  26. if expiries >= TIMER_EVENT_LIMIT:
  27. dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: posting to default loop")
  28. print("Posted timer stopped event")
  29. dut.expect(TIMER_EXPIRY_HANDLING.format(expiries))
  30. dut.expect("TIMER_EVENTS:TIMER_EVENT_EXPIRY: timer_any_handler")
  31. dut.expect("TIMER_EVENTS:TIMER_EVENT_EXPIRY: all_event_handler")
  32. print("Handled timer expiry event {} out of {}".format(expiries, TIMER_EVENT_LIMIT))
  33. dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: timer_stopped_handler")
  34. dut.expect("TIMER_EVENTS:TIMER_EVENT_STOPPED: deleted timer event source")
  35. print("Handled timer stopped event")
  36. def _test_iteration_events(dut):
  37. dut.start_app()
  38. print("Checking iteration events posting and handling")
  39. dut.expect("setting up")
  40. dut.expect("starting event sources")
  41. print("Finished setup")
  42. for iteration in range(1, TASK_ITERATION_LIMIT + 1):
  43. dut.expect(TASK_ITERATION_POST.format(iteration))
  44. print("Posted iteration {} out of {}".format(iteration, TASK_ITERATION_LIMIT))
  45. if iteration < TASK_UNREGISTRATION_LIMIT:
  46. dut.expect(TASK_ITERATION_HANDLING.format(iteration))
  47. dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: all_event_handler")
  48. elif iteration == TASK_UNREGISTRATION_LIMIT:
  49. dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: unregistering task_iteration_handler")
  50. dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: all_event_handler")
  51. print("Unregistered handler at iteration {} out of {}".format(iteration, TASK_ITERATION_LIMIT))
  52. else:
  53. dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: all_event_handler")
  54. print("Handled iteration {} out of {}".format(iteration, TASK_ITERATION_LIMIT))
  55. dut.expect("TASK_EVENTS:TASK_ITERATION_EVENT: deleting task event source")
  56. print("Deleted task event source")
  57. @ttfw_idf.idf_example_test(env_tag='Example_WIFI')
  58. def test_default_event_loop_example(env, extra_data):
  59. dut = env.get_dut('default_event_loop', 'examples/system/esp_event/default_event_loop')
  60. _test_iteration_events(dut)
  61. _test_timer_events(dut)
  62. if __name__ == '__main__':
  63. test_default_event_loop_example()