test2.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import time
  2. from eventloop import EventLoop
  3. run_time = 0
  4. eventloop.set_debug(True)
  5. def test_func(arg1, arg2):
  6. global run_time
  7. run_time += 1
  8. print("Running test function with arguments:", arg1, arg2)
  9. return arg1 + arg2
  10. def test_func2(arg1, arg2):
  11. print("test function 2 with arguments:", arg1, arg2)
  12. return arg1 + arg2
  13. def test_callback(res):
  14. print("Running test callback function", res)
  15. assert res == "Hello World"
  16. # Test case 2: Add and run a periodic task
  17. event_loop = EventLoop(period_ms=10)
  18. event_loop.start_new_task_periodic(
  19. test_func, ("Hello", " World"),
  20. period_ms=100,
  21. callback=test_callback
  22. )
  23. event_loop.start_new_task(
  24. test_func2, ("Hello", " World"),
  25. is_periodic=True,
  26. period_ms=20,
  27. callback=test_callback
  28. )
  29. event_loop.start()
  30. # Sleep for enough time to allow the periodic task to run multiple times
  31. while run_time < 3:
  32. time.sleep(0.1)
  33. event_loop.stop()
  34. # Test case 3: Test removing a task
  35. event_loop.start_new_task_periodic(
  36. test_func, ("Hello", " World"), callback=test_callback, task_name="test_task_remove")
  37. event_loop.remove_task("test_task_remove")
  38. print(event_loop._tasks)
  39. assert "test_task_remove" not in event_loop._tasks, "Failed to remove the task"