Skip to content

Instantly share code, notes, and snippets.

@jxskiss
Last active May 23, 2022 07:55
Show Gist options
  • Save jxskiss/01816eec9a2b64bae341f4d07f58646e to your computer and use it in GitHub Desktop.
Save jxskiss/01816eec9a2b64bae341f4d07f58646e to your computer and use it in GitHub Desktop.
Use "fysom" state machine as global machine
from django.db import models
from fysom_singleton import *
class ItemStatus(object):
NEW = 'new'
NEED_INFO = 'need_info'
REVIEWING = 'reviewing'
REDOING = 'redoing'
CONFLICT = 'conflict'
VERIFIED = 'verified'
DELETED = 'deleted'
SM_STATES = [
NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED, DELETED
]
SM_TRANSITIONS = [
# trigger, source, destination
['sm_prepare_new', NEW, NEED_INFO],
{
'trigger': 'sm_commit_review',
'source': NEED_INFO,
'dest': REVIEWING,
'conditions': [
{'true': 'check_review_ready'},
],
},
{
'trigger': 'sm_done_verified',
'source': [REVIEWING, REDOING],
'dest': VERIFIED,
'conditions': [
{'true': 'check_required_fields'},
{'true': 'check_barcodes_valid'},
{'true': 'check_no_conflict', 'else': CONFLICT},
],
},
['sm_mark_conflict', [REVIEWING, REDOING], CONFLICT],
['sm_revert_verified', [VERIFIED, CONFLICT], REDOING],
['sm_require_info', [REVIEWING, REDOING], NEED_INFO],
{
'trigger': 'sm_mark_deleted',
'source': [
NEW, NEED_INFO, REVIEWING, REDOING, CONFLICT, VERIFIED
],
'dest': DELETED
},
['sm_revert_deleted', DELETED, REDOING],
{
'trigger': 'sm_update',
'source': [NEW, NEED_INFO, REVIEWING, REDOING, VERIFIED],
'dest': '=',
}
]
class ItemMachineMixin(object):
sm = StateMachine(
state_field='status',
states=ItemStatus.SM_STATES,
transitions=ItemStatus.SM_TRANSITIONS,
)
# def __getattribute__(self, item):
# try:
# return super(ItemMachineMixin, self).__getattribute__(item)
# except AttributeError:
# # proxy transition calling to state machine
# if item.startswith('sm_'):
# return partial(getattr(self.sm, item), self)
# raise
def check_review_ready(self):
pass
def on_enter_verified(self, event):
pass
# other conditions and callbacks
# ...
class Item(ItemMachineMixin, models.Model):
status = models.CharField(max_length=16, default=ItemStatus.NEW)
# many other fields
# using the state machine some where
obj = Item()
obj.sm.sm_prepare_new(obj)
# the `obj.sm.sm_prepaare_new(obj)` looks ugly
# by overriding the `__getattribute__`` method of ItemMachineMixin class
# demonstrated above, it can be used like this:
obj.sm_prepare_new()
# -*- coding:utf-8 -*-
#
# fysom - pYthOn Finite State Machine - this is a port of Jake
# Gordon's javascript-state-machine to python
# https://github.com/jakesgordon/javascript-state-machine
#
# Modified from "fysom" by wsh <jxskiss@126.com>
#
"""
Modified from https://github.com/mriehl/fysom
Main difference with the original fysom:
- the StateMachine class in this module is targeted to be used as a
singleton, the object being processed is passed to related methods
as an argument
- customized for django model integration
- this module has less feature support than the original fysom:
* no initial state and final state support
* no callbacks support
- this module implements conditions and conditional transitions support
NOTE: this module is not heavily tested, especially for edge cases.
"""
class StateMachineError(Exception):
pass
class TransitionInvalidError(StateMachineError):
pass
class TransitionCanceledError(StateMachineError):
pass
class StateMachine(object):
"""
Conditions and callbacks execution order:
conditions
on_before_<event>
on_exit_<state>
<<STATE CHANGE>>
on_change_state
on_enter_<state>
on_after_<event>
When dest state is same with source state:
conditions
on_before_<event>
on_reenter_<state>
on_after_<event>
"""
WILDCARD = '*'
SAME_DEST = '='
def __init__(self, state_field, states, transitions):
self.state_field = state_field
# events registry =>
# {
# event_name: {
# "source": set(),
# "dest": dest,
# "conditions": [
# {"true/false": "condition_method"},
# {"true/false": "condition_method", "dest": conditional_dest},
# ],
# },
# }
self._map = {}
self.events = []
self.states = states
cfg = {
'transitions': transitions,
}
self._build_machine(cfg)
def is_state(self, obj, state):
return getattr(obj, self.state_field) == state
def can(self, obj, event):
return (
event in self._map and
((getattr(obj, self.state_field) in self._map[event]['source']) or
self.WILDCARD in self._map[event]['source'])
)
def cannot(self, obj, event):
return not self.can(obj, event)
def _build_machine(self, cfg):
transitions = cfg.get('transitions', [])
def _add(trans):
event = trans['trigger']
if event in self._map:
raise StateMachineError(
'Improperly configured transitions, event %s '
'already registered' % event)
src = trans['source']
if src == self.WILDCARD:
src = [self.WILDCARD]
elif self._is_base_string(src):
src = [src]
ev = {'source': set(src), 'dest': trans['dest']}
conditions = trans.get('conditions')
if conditions:
ev['conditions'] = conditions
self._map[event] = ev
# Construct all transition handlers
for trans in transitions:
if isinstance(trans, list):
trans = {'trigger': trans[0],
'source': trans[1], 'dest': trans[2]}
_add(trans)
event = trans['trigger']
self.events.append(event)
setattr(self, event, self._build_event(event))
def _build_event(self, event):
"""
For every event in the state machine, prepares the event handler.
"""
def fn(obj, *args, **kwargs):
current_state = getattr(obj, self.state_field)
# Check if this event can be triggered in the current state.
if not self.can(obj, event):
raise TransitionInvalidError(
'event %s inappropriate in current state %s'
% (event, current_state))
# On event occurrence, source will always be the current state.
src = current_state
# dest may change during checking conditions
dst = self._map[event]['dest']
if dst == self.SAME_DEST:
dst = src
# Check transition conditions first.
for c in self._map[event].get('conditions', ()):
target = 'true' if 'true' in c else 'false'
_c_r = self._check_condition(obj, target, c[target])
if not _c_r:
if 'else' in c:
dst = c['else']
break
else:
raise TransitionCanceledError(
'Cannot trigger event {0} because the {1} '
'condition returns False'.format(
event, c[target])
)
# Prepares the object with all the meta data to be passed to
# callbacks.
e = self._event_obj()
e.fsm, e.obj, e.event, e.src, e.dst = self, obj, event, src, dst
e.args, e.kwargs = args, kwargs
# used to share object saving status between callbacks
e.obj_has_saved = False
# Try to trigger the before event, unless it gets canceled.
if self._before_event(obj, e) is False:
raise TransitionCanceledError(
'Cannot trigger event {0} because the on_before_{0} '
'handler returns False'.format(event)
)
# Wraps the activities that must constitute a single successful
# transaction.
if src != dst:
def _trans():
delattr(obj, '_sm_transition')
setattr(obj, self.state_field, dst)
self._change_state(obj, e)
self._enter_state(obj, e)
self._after_event(obj, e)
obj._sm_transition = _trans
# Hook to perform asynchronous transition
if self._exit_state(obj, e) is not False:
obj._sm_transition()
else:
self._reenter_state(obj, e)
self._after_event(obj, e)
fn.__name__ = str(event)
fn.__doc__ = (
"Event handler for an {event} event. This event can be "
"fired if the machine is in {states} states.".format(
event=event, states=self._map[event].keys()))
return fn
@staticmethod
def _check_condition(obj, target, func):
if not hasattr(obj, func):
raise AttributeError("obj %s hasn't method %s" % (obj, func))
r = getattr(obj, func)()
if target == 'true':
return r is True
else: # false
return r is False
@staticmethod
def _before_event(obj, ev):
fn_name = 'on_before_' + ev.event
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _after_event(obj, ev):
fn_name = 'on_after_' + ev.event
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _enter_state(obj, ev):
fn_name = 'on_enter_' + ev.dst
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _exit_state(obj, ev):
fn_name = 'on_exit_' + ev.src
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _reenter_state(obj, ev):
fn_name = 'on_reenter_' + ev.dst
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
@staticmethod
def _change_state(obj, ev):
fn_name = 'on_change_state'
if hasattr(obj, fn_name):
return getattr(obj, fn_name)(ev)
def trigger(self, obj, event, *args, **kwargs):
"""
Triggers the given event.
The event can be triggered by calling the event handler directly,
for ex: fsm.eat(), but this method will come in handy if the event is
determined dynamically and you have the event name to trigger as
a string.
"""
if not hasattr(self, event):
raise StateMachineError(
"There isn't any event registered as %s" % event)
return getattr(self, event)(obj, event, *args, **kwargs)
@staticmethod
def _is_base_string(obj):
"""
Returns if the object is an instance of basestring.
"""
return isinstance(obj, str)
class _event_obj(object):
"""
Event object.
Attributes:
fsm, obj, event, src, dst, args, kwargs, obj_has_saved
"""
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment