| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- import _thread
- import time
- _is_debug = False
- class EventTask:
- """
- Task Item of EventLoop
- """
- _func = None
- _callback = None
- _args = None
- _period_ms = None
- _is_periodic = False
- _last_call_time = 0
- _delay_ms = None
- def __init__(self, func, callback, args, period_ms, delay_ms):
- """
- :param func: function to be called
- :param callback: callback function
- :param args: arguments of func
- :param period_ms: period of periodic task
- """
- if period_ms != None:
- self._is_periodic = True
- self._func = func
- self._callback = callback
- self._args = args
- self._period_ms = period_ms
- self._delay_ms = delay_ms
- if not delay_ms is None:
- if period_ms is None:
- period_ms = 0
- self._last_call_time = time.tick_ms() - period_ms + delay_ms
- _debug('last_call_time for delay:', self._last_call_time)
- _debug('func:', self._func)
- _debug('callback:', self._callback)
- _debug('args:', self._args)
- _debug('period_ms:', self._period_ms)
- _debug('delay_ms:', self._delay_ms)
- _debug('is_periodic:', self._is_periodic)
- class EventLoop:
- """
- Event Loop
- """
- _tasks: dict[str, EventTask] = {}
- _period_ms = 100
- _thread_stack = 0
- _need_stop = False
- _started = False
- _uuid = 0
- def __init__(self, period_ms=100, thread_stack=0):
- """
- :param period_ms: period of loop
- :param thread_stack: stack size of thread
- """
- self._period_ms = period_ms
- self._thread_stack = thread_stack
- def _add_task(self, task_name, func, callback, args, period_ms, delay_ms):
- if task_name == None:
- self._uuid += 1
- task_name = str(self._uuid)
- _debug('create task:', task_name)
- new_task = EventTask(func, callback, args, period_ms, delay_ms)
- self._tasks[task_name] = new_task
- def start_new_task(self,
- func, args,
- is_periodic=True,
- period_ms=1000,
- callback=None,
- task_name=None,
- delay_ms=None
- ):
- """
- Add a task to EventLoop
- :param task_name: name of task
- :param func: function to be called
- :param period_ms: period of task
- :param callback: callback function
- :param args: arguments of func
- """
- if is_periodic:
- self._add_task(task_name, func, callback,
- args, period_ms, delay_ms)
- else:
- self._add_task(task_name, func, callback, args, None, delay_ms)
- def start_new_task_once(self,
- func, args,
- callback=None,
- task_name=None,
- delay_ms=None):
- """
- Add a task to EventLoop, run once
- :param task_name: name of task
- :param func: function to be called
- :param callback: callback function
- :param args: arguments of func
- """
- self.start_new_task(func, args, False, None,
- callback, task_name, delay_ms)
- def start_new_task_periodic(self, func, args, period_ms=1000, callback=None, task_name=None, delay_ms=None):
- """
- Add a task to EventLoop, run periodically
- :param task_name: name of task
- :param func: function to be called
- :param period_ms: period of task
- :param callback: callback function
- :param args: arguments of func
- """
- self.start_new_task(func, args, True, period_ms,
- callback, task_name, delay_ms)
- def remove_task(self, task_name):
- """
- Remove a task from EventLoop
- :param task_name: name of task
- """
- _debug('remove_task', task_name)
- self._tasks.remove(task_name)
- def _run_task(self, task: EventTask):
- _res = task._func(*task._args)
- if task._callback != None:
- task._callback(_res)
- def _run_thread(self):
- task_last = None
- while not self._need_stop:
- tick = time.tick_ms()
- for task_name, task in self._tasks.items():
- if task_last == task:
- # already run this task last time, run other task first
- task_last = None
- continue
- if tick - task._last_call_time > task._period_ms:
- _debug('run_task', task_name)
- _debug('tick', tick)
- _debug('last_call_time', task._last_call_time)
- task_last = task
- self._run_task(task)
- task._last_call_time = tick
- _debug('is_periodic', task._is_periodic)
- if not task._is_periodic:
- self.remove_task(task_name)
- # only run one task per loop
- # if the task are removed, the for loop will be broken
- break
- if self._need_stop:
- break
- time.sleep_ms(self._period_ms)
- self._started = False
- def start(self):
- """
- Start EventLoop
- """
- if self._started:
- print('EventLoop already started')
- return
- self._need_stop = False
- last_stack_size = _thread.stack_size(self._thread_stack)
- _thread.start_new_thread(self._run_thread, ())
- last_stack_size = _thread.stack_size(last_stack_size)
- self._started = True
- def stop(self):
- """
- Stop EventLoop
- """
- if not self._started:
- print('EventLoop not started')
- return
- self._need_stop = True
- while self._started:
- time.sleep(0.1)
- time.sleep(1)
- def set_debug(enable: bool):
- global _is_debug
- _is_debug = enable
- def _debug(*args):
- if _is_debug:
- print('\x1b[33m[eventloop]', *args, "\x1b[0m")
- g_default_event_loop: EventLoop = None
- def _get_default_event_loop():
- global g_default_event_loop
- if g_default_event_loop == None:
- g_default_event_loop = EventLoop()
- g_default_event_loop.start()
- return g_default_event_loop
- def start_new_task(func, args,
- is_periodic=True,
- period_ms=1000,
- callback=None,
- task_name=None,
- delay_ms=None
- ):
- """
- Add a task to EventLoop
- :param task_name: name of task
- :param func: function to be called
- :param period_ms: period of task
- :param callback: callback function
- :param args: arguments of func
- """
- eventloop = _get_default_event_loop()
- eventloop.start_new_task(func, args, is_periodic,
- period_ms, callback, task_name, delay_ms)
- def start_new_task_once(func, args,
- callback=None,
- task_name=None,
- delay_ms=None
- ):
- """
- Add a task to EventLoop, run once
- :param task_name: name of task
- :param func: function to be called
- :param callback: callback function
- :param args: arguments of func
- """
- eventloop = _get_default_event_loop()
- eventloop.start_new_task_once(func, args, callback, task_name, delay_ms)
- def start_new_task_periodic(func, args,
- period_ms=1000,
- callback=None,
- task_name=None,
- delay_ms=None
- ):
- """
- Add a task to EventLoop, run periodically
- :param task_name: name of task
- :param func: function to be called
- :param period_ms: period of task
- :param callback: callback function
- :param args: arguments of func
- """
- eventloop = _get_default_event_loop()
- eventloop.start_new_task_periodic(
- func, args, period_ms, callback, task_name, delay_ms)
- def remove_task(task_name):
- """
- Remove a task from EventLoop
- :param task_name: name of task
- """
- eventloop = _get_default_event_loop()
- eventloop.remove_task(task_name)
- def stop():
- """
- Stop EventLoop
- """
- if g_default_event_loop == None:
- return
- _debug('stop default eventloop')
- eventloop = _get_default_event_loop()
- eventloop.stop()
- def start():
- """
- Start EventLoop
- """
- eventloop = _get_default_event_loop()
- eventloop.start()
- def __del__():
- stop()
|