stoppable_thread.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Apache-2.0
  3. import threading
  4. from typing import Optional
  5. class StoppableThread(object):
  6. """
  7. Provide a Thread-like class which can be 'cancelled' via a subclass-provided
  8. cancellation method.
  9. Can be started and stopped multiple times.
  10. Isn't an instance of type Thread because Python Thread objects can only be run once
  11. """
  12. def __init__(self):
  13. # type: () -> None
  14. self._thread = None # type: Optional[threading.Thread]
  15. @property
  16. def alive(self):
  17. # type: () -> bool
  18. """
  19. Is 'alive' whenever the internal thread object exists
  20. """
  21. return self._thread is not None
  22. def start(self):
  23. # type: () -> None
  24. if self._thread is None:
  25. self._thread = threading.Thread(target=self._run_outer)
  26. self._thread.start()
  27. def _cancel(self):
  28. # type: () -> None
  29. pass # override to provide cancellation functionality
  30. def run(self):
  31. # type: () -> None
  32. pass # override for the main thread behaviour
  33. def _run_outer(self):
  34. # type: () -> None
  35. try:
  36. self.run()
  37. finally:
  38. self._thread = None
  39. def stop(self):
  40. # type: () -> None
  41. if self._thread is not None:
  42. old_thread = self._thread
  43. self._thread = None
  44. self._cancel()
  45. old_thread.join()