有限状态机(FSM)的简单理解和Python实现

最近在项目中,涉及到对行为和状态进行建模的需求,尝试用有限状态机(Finite-state machine, FSM)来实现。python

1. 概念介绍

1.1 运行机制

基于对有限状态机的粗浅理解,大致的运行机制为:ide

  • 系统所处的状态是明确而且有限的,一定属于状态全集中的某一种;
  • 系统接受输入,根据断定条件,决定是维持当前状态,仍是切换到某一个新的状态;
  • 在维持或切换的过程当中,执行一些预设的操做。

能够认为有限状态机是一个离散系统,每接受一次输入,进行一次判断和切换。code

1.2 所含要素

一个有限状态机包含以下几个要素:orm

  • 状态:系统所处的状态,在运行过程当中又能够分为当前状态下一阶段状态blog

  • 事件:也能够理解为每一次运行的输入;继承

  • 条件:根据输入事件执行的断定条件,条件是基于状态的,当前所处的每一种状态,均可以有本身对应的一套断定条件,来决定下一步进入哪种状态;事件

  • 动做:肯定切换路径后,执行的附加操做。ci

以一个共3种状态的FSM为例,共有3套断定条件,根据当前所处的状态来肯定使用哪种断定条件,共有3*3=9种动做,决定每一种状态切换过程当中须要执行的动做。input

1.3 分析方法

一般能够用一个表格来对所处理的FSM进行分析,防止状况的遗漏。it

在表格中分析清楚每一种状态切换的断定条件和执行动做,再用代码实现,能够最大程度地减轻思考的难度,减小错误的几率。

2. 代码实现

以OOP的方式,作了一个基础的Python实现。

FSM基类

class StateMachine:
    def __init__(self, cfg, states, events_handler, actions_handler):
        # config information for an instance
        self.cfg = cfg
        # define the states and the initial state
        self.states = [s.lower() for s in states]
        self.state = self.states[0]
        # process the inputs according to current state
        self.events = dict()
        # actions according to current transfer 
        self.actions = {state: dict() for state in self.states}
        # cached data for temporary use
        self.records = dict()
        # add events and actions
        for i, state in enumerate(self.states):
            self._add_event(state, events_handler[i])
            for j, n_state in enumerate(self.states):
                self._add_action(state, n_state, actions_handler[i][j])

    def _add_event(self, state, handler):
        self.events[state] = handler

    def _add_action(self, cur_state, next_state, handler):
        self.actions[cur_state][next_state] = handler

    def run(self, inputs):
        # decide the state-transfer according to the inputs
        new_state, outputs = self.events[self.state](inputs, self.states, self.records, self.cfg)
        # do the actions related with the transfer 
        self.actions[self.state][new_state](outputs, self.records, self.cfg)
        # do the state transfer
        self.state = new_state
        return new_state

    def reset(self):
        self.state = self.states[0]
        self.records = dict()
        return

# handlers for events and actions, event_X and action_XX are all specific functions
events_handlers = [event_A, event_B]
actions_handlers = [[action_AA, action_AB],
                    [action_BA, action_BB]]

# define an instance of StateMachine
state_machine = StateMachine(cfg, states, events_handlers, actions_handlers)

若是对于状态机有具体的要求,能够继承这个基类进行派生。

好比,有对状态机分层嵌套的需求。

class StateGeneral(StateMachine):
    def __init__(self, cfg, states):
        super(StateGeneral, self).__init__(cfg, states, events_handler, actions_handler)
        self.sub_state_machines = dict()

    def add_sub_fsm(self, name, fsm):
        self.sub_state_machines[name] = fsm

    def run(self, inputs):
        new_state, outputs = self.events[self.state](inputs, self.states, self.records, self.cfg)
        # operate the sub_state_machines in actions
        self.actions[self.state][new_state](outputs, self.records, self.cfg, \
        									self.sub_state_machines)
        self.state = new_state
        return new_state

    def reset(self):
        self.state = self.states[0]
        self.records = dict()
        for _, sub_fsm in self.sub_state_machines.items():
            sub_fsm.reset()
        return
相关文章
相关标签/搜索