fsm.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import _thread
  2. import time
  3. _defaultStateMachine: "StateMachine" = None
  4. CONFIG_DEBUG = False
  5. def debug(*info):
  6. print("debug: ", end='')
  7. print(*info)
  8. def error(*info):
  9. print("error: ", end='')
  10. print(*info)
  11. def info(*info):
  12. print("info: ", end='')
  13. print(*info)
  14. class State:
  15. _name: str = None
  16. _function: callable = None
  17. def __init__(self, name: str, function: callable):
  18. self._name = name
  19. self._function = function
  20. def getName(self):
  21. return self._name
  22. def getFunction(self):
  23. return self._function
  24. def __str__(self):
  25. return self._name
  26. class StateMachine:
  27. _statelist: list[State] = None
  28. _currentState: State = None
  29. _needStop: bool = False
  30. _stoped: bool = False
  31. def __init__(self):
  32. self._statelist = []
  33. def addState(self, state: State):
  34. self._statelist.append(state)
  35. def getStateByName(self, stateName: str):
  36. for state in self._statelist:
  37. if state.getName() == stateName:
  38. return state
  39. return None
  40. def getStateByFunction(self, stateFunction: callable):
  41. for state in self._statelist:
  42. if state.getFunction() == stateFunction:
  43. return state
  44. return None
  45. def getState(self, stateNameOrFn: str):
  46. if type(stateNameOrFn) == str:
  47. return self.getStateByName(stateNameOrFn)
  48. else:
  49. return self.getStateByFunction(stateNameOrFn)
  50. def setCurrentState(self, stateNameOrFn: str) -> int:
  51. debug("stateNameOrFn: " + str(stateNameOrFn))
  52. state = self.getState(stateNameOrFn)
  53. debug("nextState: " + str(state))
  54. if state is None:
  55. error("Error: state not found: " + stateNameOrFn)
  56. return -1
  57. self._currentState = state
  58. return 0
  59. def runCurrentState(self) -> str:
  60. if None == self._currentState:
  61. error("Error: current state is None")
  62. return None
  63. fn = self._currentState.getFunction()
  64. info("current state: " + str(self._currentState))
  65. debug("current state fn: " + str(fn))
  66. return fn()
  67. def mainLoop(self, initStateName: str):
  68. debug("mainloop: initStateName: " + initStateName)
  69. res = self.setCurrentState(initStateName)
  70. debug("mainloop: setCurrentState: " + str(res))
  71. while not self._needStop:
  72. nextStateNameOrFn = self.runCurrentState()
  73. if None == nextStateNameOrFn:
  74. error("Error: runCurrentState return None")
  75. break
  76. res = self.setCurrentState(nextStateNameOrFn)
  77. if 0 != res:
  78. error("State Machine Error: " + str(res))
  79. break
  80. time.sleep(0.1)
  81. self._stoped = True
  82. return
  83. def start(self, initStateName: str):
  84. debug("start state machine, initStateName: " + initStateName)
  85. _thread.start_new_thread(self.mainLoop, (initStateName,))
  86. def stop(self):
  87. self._needStop = True
  88. info("stop state machine")
  89. def wait(self):
  90. while not self._stoped:
  91. time.sleep(0.1)
  92. def _initDefaultStateMachine():
  93. global _defaultStateMachine
  94. if _defaultStateMachine is None:
  95. _defaultStateMachine = StateMachine()
  96. def addState(stateFunction: callable, stateName: str = 'default'):
  97. _initDefaultStateMachine()
  98. _defaultStateMachine.addState(State(stateName, stateFunction))
  99. def start(initStateName: str):
  100. _initDefaultStateMachine()
  101. _defaultStateMachine.start(initStateName)
  102. def stop():
  103. _initDefaultStateMachine()
  104. _defaultStateMachine.stop()
  105. def wait():
  106. _initDefaultStateMachine()
  107. _defaultStateMachine.wait()