Skip to content

Instantly share code, notes, and snippets.

@jvkersch
Created March 21, 2021 14:05
Show Gist options
  • Save jvkersch/5265b15a8d9cc85afe16b0e7c51d76af to your computer and use it in GitHub Desktop.
Save jvkersch/5265b15a8d9cc85afe16b0e7c51d76af to your computer and use it in GitHub Desktop.
Implementation of the actor model in Python
import abc
import asyncio
class MessageBox:
def __init__(self):
self._queue = asyncio.Queue()
async def send(self, message_type, *content):
await self._queue.put((message_type, content))
async def receive(self):
return await self._queue.get()
class Actor(abc.ABC):
""" Abstract base class for actors with custom logic.
"""
# Messages
ERROR = "error"
@abc.abstractmethod
async def run(self):
pass
def __init__(self, message_box=None, parent_message_box=None):
self.message_box = message_box or MessageBox()
self._parent_message_box = parent_message_box
self._actor_tasks = []
async def _run(self):
result = None
try:
result = await self.run()
except asyncio.CancelledError:
pass
except Exception as e:
await self._parent_message_box.send(Actor.ERROR, e)
await self._cleanup_children()
return result
def spawn(self, actor_class, *args, **kwds):
actor = self._actor_factory(actor_class)
task = asyncio.create_task(actor._run(*args, **kwds))
self._actor_tasks.append(task)
return actor.message_box
def _actor_factory(self, actor_class):
message_box = MessageBox()
actor = actor_class(
message_box=message_box, parent_message_box=self.message_box
)
return actor
async def _cleanup_children(self):
for task in self._actor_tasks:
task.cancel()
try:
await asyncio.gather(*self._actor_tasks)
except asyncio.CancelledError:
pass
class RootActor(Actor):
""" An actor that waits for its children and results the result.
"""
async def run(self):
return await asyncio.gather(*self._actor_tasks)
import asyncio
from actors import Actor, RootActor
class Counter(Actor):
# Messages
INCREMENT = "increment"
DISPLAY = "display"
SHUTDOWN = "shutdown"
async def run(self):
print("starting")
count = 0
while True:
print(f"{id(self)} waiting for message")
message_type, *content = await self.message_box.receive()
print(f"{id(self)} got message", message_type)
await asyncio.sleep(0.5)
if message_type == Counter.INCREMENT:
count += 1
elif message_type == Counter.DISPLAY:
print(f"Counter {id(self)}: {count}")
elif message_type == Counter.SHUTDOWN:
break
async def main():
root = RootActor()
counter_pid1 = root.spawn(Counter)
counter_pid2 = root.spawn(Counter)
counter_pid3 = root.spawn(Counter)
print("spawned")
for _ in range(3):
await counter_pid1.send(Counter.INCREMENT)
await counter_pid2.send(Counter.INCREMENT)
await counter_pid3.send(Counter.INCREMENT)
print("done sending")
await counter_pid1.send(Counter.DISPLAY)
await counter_pid2.send(Counter.DISPLAY)
await counter_pid3.send(Counter.DISPLAY)
await counter_pid1.send(Counter.SHUTDOWN)
await counter_pid2.send(Counter.SHUTDOWN)
await counter_pid3.send(Counter.SHUTDOWN)
await root.run()
print("done running")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from actors import Actor, RootActor
class Processor(Actor):
# Messages
PROCESS = "process"
async def run(self):
message_type, content = await self.message_box.receive()
if message_type == Processor.PROCESS:
fun, elem, mapper = content
if elem % 3 == 0:
raise ValueError()
result = await fun(elem)
await mapper.send(Mapper.RECEIVE, result)
class Mapper(Actor):
# Messages
MAP = "map"
RECEIVE = "receive"
async def run(self):
results = []
n_to_receive = -1
while True:
message_type, content = await self.message_box.receive()
if message_type == Mapper.MAP:
# Spawn a new actor for each element in the iterable
fun, elems = content
n_to_receive = len(elems)
for elem in elems:
proc = self.spawn(Processor)
await proc.send(Processor.PROCESS, fun, elem, self.message_box)
elif message_type == Mapper.RECEIVE:
results.append(content[0])
print("received data", content[0])
if n_to_receive == len(results):
break
elif message_type == Mapper.ERROR:
n_to_receive -= 1
print("got error", content)
print("done")
return results
async def main():
root = RootActor()
async def f(x):
await asyncio.sleep(0.5)
return x + 1
mapper_pid = root.spawn(Mapper)
await mapper_pid.send(Mapper.MAP, f, range(10))
retval = await root.run()
print(retval)
if __name__ == "__main__":
asyncio.run(main())
@Corwinpro
Copy link

Many thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment