| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- # Copyright 2015-2021 Espressif Systems (Shanghai) CO LTD
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import threading
- from typing import Optional
- class StoppableThread(object):
- """
- Provide a Thread-like class which can be 'cancelled' via a subclass-provided
- cancellation method.
- Can be started and stopped multiple times.
- Isn't an instance of type Thread because Python Thread objects can only be run once
- """
- def __init__(self):
- # type: () -> None
- self._thread = None # type: Optional[threading.Thread]
- @property
- def alive(self):
- # type: () -> bool
- """
- Is 'alive' whenever the internal thread object exists
- """
- return self._thread is not None
- def start(self):
- # type: () -> None
- if self._thread is None:
- self._thread = threading.Thread(target=self._run_outer)
- self._thread.start()
- def _cancel(self):
- # type: () -> None
- pass # override to provide cancellation functionality
- def run(self):
- # type: () -> None
- pass # override for the main thread behaviour
- def _run_outer(self):
- # type: () -> None
- try:
- self.run()
- finally:
- self._thread = None
- def stop(self):
- # type: () -> None
- if self._thread is not None:
- old_thread = self._thread
- self._thread = None
- self._cancel()
- old_thread.join()
|