| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import _thread
- import time
- _defaultStateMachine: "StateMachine" = None
- CONFIG_DEBUG = False
- def debug(*info):
- print("debug: ", end='')
- print(*info)
- def error(*info):
- print("error: ", end='')
- print(*info)
- def info(*info):
- print("info: ", end='')
- print(*info)
- class State:
- _name: str = None
- _function: callable = None
- def __init__(self, name: str, function: callable):
- self._name = name
- self._function = function
- def getName(self):
- return self._name
- def getFunction(self):
- return self._function
- def __str__(self):
- return self._name
- class StateMachine:
- _statelist: list[State] = None
- _currentState: State = None
- _needStop: bool = False
- _stoped: bool = False
- def __init__(self):
- self._statelist = []
- def addState(self, state: State):
- self._statelist.append(state)
- def getStateByName(self, stateName: str):
- for state in self._statelist:
- if state.getName() == stateName:
- return state
- return None
- def getStateByFunction(self, stateFunction: callable):
- for state in self._statelist:
- if state.getFunction() == stateFunction:
- return state
- return None
- def getState(self, stateNameOrFn: str):
- if type(stateNameOrFn) == str:
- return self.getStateByName(stateNameOrFn)
- else:
- return self.getStateByFunction(stateNameOrFn)
- def setCurrentState(self, stateNameOrFn: str) -> int:
- debug("stateNameOrFn: " + str(stateNameOrFn))
- state = self.getState(stateNameOrFn)
- debug("nextState: " + str(state))
- if state is None:
- error("Error: state not found: " + stateNameOrFn)
- return -1
- self._currentState = state
- return 0
- def runCurrentState(self) -> str:
- if None == self._currentState:
- error("Error: current state is None")
- return None
- fn = self._currentState.getFunction()
- info("current state: " + str(self._currentState))
- debug("current state fn: " + str(fn))
- return fn()
- def mainLoop(self, initStateName: str):
- debug("mainloop: initStateName: " + initStateName)
- res = self.setCurrentState(initStateName)
- debug("mainloop: setCurrentState: " + str(res))
- while not self._needStop:
- nextStateNameOrFn = self.runCurrentState()
- if None == nextStateNameOrFn:
- error("Error: runCurrentState return None")
- break
- res = self.setCurrentState(nextStateNameOrFn)
- if 0 != res:
- error("State Machine Error: " + str(res))
- break
- time.sleep(0.1)
- self._stoped = True
- return
- def start(self, initStateName: str):
- debug("start state machine, initStateName: " + initStateName)
- _thread.start_new_thread(self.mainLoop, (initStateName,))
- def stop(self):
- self._needStop = True
- info("stop state machine")
- def wait(self):
- while not self._stoped:
- time.sleep(0.1)
- def _initDefaultStateMachine():
- global _defaultStateMachine
- if _defaultStateMachine is None:
- _defaultStateMachine = StateMachine()
- def addState(stateFunction: callable, stateName: str = 'default'):
- _initDefaultStateMachine()
- _defaultStateMachine.addState(State(stateName, stateFunction))
- def start(initStateName: str):
- _initDefaultStateMachine()
- _defaultStateMachine.start(initStateName)
- def stop():
- _initDefaultStateMachine()
- _defaultStateMachine.stop()
- def wait():
- _initDefaultStateMachine()
- _defaultStateMachine.wait()
|