Browse Source

add fsm module and test

lyon 2 years ago
parent
commit
04f59ff042

+ 27 - 0
examples/fsm/test1.py

@@ -0,0 +1,27 @@
+import fsm
+
+
+def state1():
+    print("state1")
+    return "state2"
+
+
+def state2():
+    print("state2")
+    return state3
+
+
+def state3():
+    print("state3")
+    fsm.stop()
+    return "state1"
+
+
+def test_fsm():
+    fsm.addState(state1, "state1")
+    fsm.addState(state2, "state2")
+    fsm.addState(state3, "state3")
+    fsm.start("state1")
+    fsm.wait()
+
+test_fsm()

+ 144 - 0
package/fsm/fsm.py

@@ -0,0 +1,144 @@
+import _thread
+import time
+
+_defaultStateMachine: "StateMachine" = None
+
+CONFIG_DEBUG = False
+
+
+def debug(*info):
+    print("debug: ", end='')
+    print(*info)
+
+
+def error(*info):
+    print("error: ", end='')
+    print(*info)
+
+
+def info(*info):
+    print("info: ", end='')
+    print(*info)
+
+
+class State:
+    _name: str = None
+    _function: callable = None
+
+    def __init__(self, name: str, function: callable):
+        self._name = name
+        self._function = function
+
+    def getName(self):
+        return self._name
+
+    def getFunction(self):
+        return self._function
+
+    def __str__(self):
+        return self._name
+
+
+class StateMachine:
+    _statelist: list[State] = None
+    _currentState: State = None
+    _needStop: bool = False
+    _stoped: bool = False
+
+    def __init__(self):
+        self._statelist = []
+
+    def addState(self, state: State):
+        self._statelist.append(state)
+
+    def getStateByName(self, stateName: str):
+        for state in self._statelist:
+            if state.getName() == stateName:
+                return state
+        return None
+
+    def getStateByFunction(self, stateFunction: callable):
+        for state in self._statelist:
+            if state.getFunction() == stateFunction:
+                return state
+        return None
+
+    def getState(self, stateNameOrFn: str):
+        if type(stateNameOrFn) == str:
+            return self.getStateByName(stateNameOrFn)
+        else:
+            return self.getStateByFunction(stateNameOrFn)
+
+    def setCurrentState(self, stateNameOrFn: str) -> int:
+        debug("stateNameOrFn: " + str(stateNameOrFn))
+        state = self.getState(stateNameOrFn)
+        debug("nextState: " + str(state))
+        if state is None:
+            error("Error: state not found: " + stateNameOrFn)
+            return -1
+        self._currentState = state
+        return 0
+
+    def runCurrentState(self) -> str:
+        if None == self._currentState:
+            error("Error: current state is None")
+            return None
+        fn = self._currentState.getFunction()
+        info("current state: " + str(self._currentState))
+        debug("current state fn: " + str(fn))
+        return fn()
+
+    def mainLoop(self, initStateName: str):
+        debug("mainloop: initStateName: " + initStateName)
+        res = self.setCurrentState(initStateName)
+        debug("mainloop: setCurrentState: " + str(res))
+        while not self._needStop:
+            nextStateNameOrFn = self.runCurrentState()
+            if None == nextStateNameOrFn:
+                error("Error: runCurrentState return None")
+                break
+            res = self.setCurrentState(nextStateNameOrFn)
+            if 0 != res:
+                error("State Machine Error: " + str(res))
+                break
+            time.sleep(0.1)
+        self._stoped = True
+        return
+
+    def start(self, initStateName: str):
+        debug("start state machine, initStateName: " + initStateName)
+        _thread.start_new_thread(self.mainLoop, (initStateName,))
+
+    def stop(self):
+        self._needStop = True
+        info("stop state machine")
+
+    def wait(self):
+        while not self._stoped:
+            time.sleep(0.1)
+
+
+def _initDefaultStateMachine():
+    global _defaultStateMachine
+    if _defaultStateMachine is None:
+        _defaultStateMachine = StateMachine()
+
+
+def addState(stateFunction: callable, stateName: str = 'default'):
+    _initDefaultStateMachine()
+    _defaultStateMachine.addState(State(stateName, stateFunction))
+
+
+def start(initStateName: str):
+    _initDefaultStateMachine()
+    _defaultStateMachine.start(initStateName)
+
+
+def stop():
+    _initDefaultStateMachine()
+    _defaultStateMachine.stop()
+
+
+def wait():
+    _initDefaultStateMachine()
+    _defaultStateMachine.wait()

+ 144 - 0
port/linux/package/pikascript/fsm.py

@@ -0,0 +1,144 @@
+import _thread
+import time
+
+_defaultStateMachine: "StateMachine" = None
+
+CONFIG_DEBUG = False
+
+
+def debug(*info):
+    print("debug: ", end='')
+    print(*info)
+
+
+def error(*info):
+    print("error: ", end='')
+    print(*info)
+
+
+def info(*info):
+    print("info: ", end='')
+    print(*info)
+
+
+class State:
+    _name: str = None
+    _function: callable = None
+
+    def __init__(self, name: str, function: callable):
+        self._name = name
+        self._function = function
+
+    def getName(self):
+        return self._name
+
+    def getFunction(self):
+        return self._function
+
+    def __str__(self):
+        return self._name
+
+
+class StateMachine:
+    _statelist: list[State] = None
+    _currentState: State = None
+    _needStop: bool = False
+    _stoped: bool = False
+
+    def __init__(self):
+        self._statelist = []
+
+    def addState(self, state: State):
+        self._statelist.append(state)
+
+    def getStateByName(self, stateName: str):
+        for state in self._statelist:
+            if state.getName() == stateName:
+                return state
+        return None
+
+    def getStateByFunction(self, stateFunction: callable):
+        for state in self._statelist:
+            if state.getFunction() == stateFunction:
+                return state
+        return None
+
+    def getState(self, stateNameOrFn: str):
+        if type(stateNameOrFn) == str:
+            return self.getStateByName(stateNameOrFn)
+        else:
+            return self.getStateByFunction(stateNameOrFn)
+
+    def setCurrentState(self, stateNameOrFn: str) -> int:
+        debug("stateNameOrFn: " + str(stateNameOrFn))
+        state = self.getState(stateNameOrFn)
+        debug("nextState: " + str(state))
+        if state is None:
+            error("Error: state not found: " + stateNameOrFn)
+            return -1
+        self._currentState = state
+        return 0
+
+    def runCurrentState(self) -> str:
+        if None == self._currentState:
+            error("Error: current state is None")
+            return None
+        fn = self._currentState.getFunction()
+        info("current state: " + str(self._currentState))
+        debug("current state fn: " + str(fn))
+        return fn()
+
+    def mainLoop(self, initStateName: str):
+        debug("mainloop: initStateName: " + initStateName)
+        res = self.setCurrentState(initStateName)
+        debug("mainloop: setCurrentState: " + str(res))
+        while not self._needStop:
+            nextStateNameOrFn = self.runCurrentState()
+            if None == nextStateNameOrFn:
+                error("Error: runCurrentState return None")
+                break
+            res = self.setCurrentState(nextStateNameOrFn)
+            if 0 != res:
+                error("State Machine Error: " + str(res))
+                break
+            time.sleep(0.1)
+        self._stoped = True
+        return
+
+    def start(self, initStateName: str):
+        debug("start state machine, initStateName: " + initStateName)
+        _thread.start_new_thread(self.mainLoop, (initStateName,))
+
+    def stop(self):
+        self._needStop = True
+        info("stop state machine")
+
+    def wait(self):
+        while not self._stoped:
+            time.sleep(0.1)
+
+
+def _initDefaultStateMachine():
+    global _defaultStateMachine
+    if _defaultStateMachine is None:
+        _defaultStateMachine = StateMachine()
+
+
+def addState(stateFunction: callable, stateName: str = 'default'):
+    _initDefaultStateMachine()
+    _defaultStateMachine.addState(State(stateName, stateFunction))
+
+
+def start(initStateName: str):
+    _initDefaultStateMachine()
+    _defaultStateMachine.start(initStateName)
+
+
+def stop():
+    _initDefaultStateMachine()
+    _defaultStateMachine.stop()
+
+
+def wait():
+    _initDefaultStateMachine()
+    _defaultStateMachine.wait()

+ 1 - 0
port/linux/package/pikascript/main.py

@@ -7,6 +7,7 @@ import configparser, network
 import test_module1, test_cmodule, test_module4, import_test
 import hashlib, hmac, aes, base64, time, os
 import _thread, weakref, eventloop
+import fsm
 
 mem = PikaStdLib.MemChecker()
 print('hello pikascript!')

+ 27 - 0
test/python/fsm/test1.py

@@ -0,0 +1,27 @@
+import fsm
+
+
+def state1():
+    print("state1")
+    return "state2"
+
+
+def state2():
+    print("state2")
+    return state3
+
+
+def state3():
+    print("state3")
+    fsm.stop()
+    return "state1"
+
+
+def test_fsm():
+    fsm.addState(state1, "state1")
+    fsm.addState(state2, "state2")
+    fsm.addState(state3, "state3")
+    fsm.start("state1")
+    fsm.wait()
+
+test_fsm()