Skip to content

Instantly share code, notes, and snippets.

@CodyKochmann
Created August 29, 2019 11:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CodyKochmann/91291d1864144bf17278607ac2209de1 to your computer and use it in GitHub Desktop.
Save CodyKochmann/91291d1864144bf17278607ac2209de1 to your computer and use it in GitHub Desktop.
Using python generator coroutines as actors
#!/usr/bin/env python3
# by: Cody Kochmann
from itertools import cycle
''' this demonstrates how a python generator can be
sent actor style messages via exceptions to delegate
what logic gets applied to each message by taking
advantage of the easy to use mapping try/except trees
already provide '''
class Message(BaseException):
''' base class for actor messages '''
class NewRRActor(Message):
''' Specifies actors that will be added to a round robin cycler '''
def __init__(self, *, name, pipe):
Message.__init__(self, name, pipe)
class ExpiredRRActor(Message):
''' Specifies actors that have expired and should be removed from a round robin schedule '''
class FinishedRRActor(Message):
''' raised when an actor is done iterating and needs to be dropped '''
def rr_actor(name, pipe):
for i in pipe:
yield name, i
def round_robin(*actors):
''' this acts as a round robin scheduler for multiple iterators '''
# verifies that all actors are iterable
actors = [a for a in actors if iter(a)]
while True:
try:
''' This "try" block is essentially this
actor's main processing "tick". If an
exception is thrown, the main tick pauses
for a moment and handles the exception.
Normal processing resumes on the next
"tick". '''
for next_actor in cycle(actors):
try:
for next_message in next_actor:
yield next_message
break
except StopIteration:
raise FinishedRRActor(next_actor)
except NewRRActor as message:
# logic to add new actors
new_actor = rr_actor(*message.args)
actors.append(new_actor)
yield new_actor
except ExpiredRRActor as message:
# logic to remove expired actors
for expired_actor in message.args:
while expired_actor in actors:
actors.remove(expired_actor)
yield # end of tick
except FinishedRRActor as message:
for finished_actor in message.args:
while finished_actor in actors:
actors.remove(finished_actor)
# no end of tick
def test_actor(*, name, counts_to):
''' this is just a test iterator that infinitely
yields the name and the next number in the
sequence to demonstrate individual processing and
private state within each individual actor '''
for i in cycle(range(counts_to)):
yield name, i
if __name__ == '__main__':
rr = round_robin(
rr_actor(
name='marcin',
pipe=cycle(range(10))
),
rr_actor(
name='daniel',
pipe=cycle(range(4))
)
)
''' the loop below iterates over the first 30
messages out of the round robin pipeline while
demonstrating how you can use typed message
passing to modify the pipeline without needing to
restart the iteration process '''
new_guy = None
for i,message in zip(range(30), rr):
print(message)
if i is 10:
print('adding cody')
new_guy = rr.throw(
NewRRActor(
name='cody', pipe=range(2)
)
)
elif i is 20:
print('expiring cody')
rr.throw(ExpiredRRActor(new_guy))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment