Skip to content

Instantly share code, notes, and snippets.

@ttsiodras
Last active January 1, 2024 06:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ttsiodras/fe32284ac204907249d479b4225eb83c to your computer and use it in GitHub Desktop.
Save ttsiodras/fe32284ac204907249d479b4225eb83c to your computer and use it in GitHub Desktop.
The cleanest way (I've ever seen) to implement state machines in Python
#!/usr/bin/env python3
# Based on a great article from Eli Bendersky:
#
# http://eli.thegreenplace.net/2009/08/29/co-routines-as-an-alternative-to-state-machines/
#
# I just "ported" to Python3 (still using plain old yield) and added the proper type annotations,
# since Python 3.5 can now support them (and mypy can check them):
#
# $ mypy --disallow-untyped-defs protocol_via_coroutines.py
# $
#
from typing import Callable, Generator, List
# A protocol decoder:
#
# - yields Nothing
# - expects ints to be `send` in his yield waits
# - and doesn't return anything.
ProtocolDecodingCoroutine = Generator[None, int, None]
# A frame consumer (passed as an argument to a protocol decoder):
#
# - yields Nothing
# - expects List[int] to be `send` in his waiting yields
# - and doesn't return anything.
FrameConsumerCoroutine = Generator[None, List[int], None]
def unwrap_protocol(header: int=0x61,
footer: int=0x62,
dle: int=0xAB,
after_dle_func: Callable[[int], int]=lambda x: x,
target: FrameConsumerCoroutine=None) -> ProtocolDecodingCoroutine:
"""
Simplified framing (protocol unwrapping) co-routine.
"""
# Outer loop looking for a frame header
#
while True:
byte = (yield)
frame = [] # type: List[int]
if byte == header:
# Capture the full frame
#
while True:
byte = (yield)
if byte == footer:
target.send(frame)
break
elif byte == dle:
byte = (yield)
frame.append(after_dle_func(byte))
else:
frame.append(byte)
def frame_receiver() -> FrameConsumerCoroutine:
"""
A simple co-routine "sink" for receiving full frames.
"""
while True:
frame = (yield)
print('Got frame:', ''.join('%02x' % x for x in frame))
bytestream = bytes(
bytearray((0x70, 0x24,
0x61, 0x99, 0xAF, 0xD1, 0x62,
0x56, 0x62,
0x61, 0xAB, 0xAB, 0x14, 0x62,
0x7)))
frame_consumer = frame_receiver()
next(frame_consumer) # Get to the yield
unwrapper = unwrap_protocol(target=frame_consumer)
next(unwrapper) # Get to the yield
for byte in bytestream:
unwrapper.send(byte)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment