Skip to content

Instantly share code, notes, and snippets.

@ccutch
Created March 22, 2021 10:16
Show Gist options
  • Save ccutch/7e69265a9a8f81908d37cda61b0536f6 to your computer and use it in GitHub Desktop.
Save ccutch/7e69265a9a8f81908d37cda61b0536f6 to your computer and use it in GitHub Desktop.
Just incase you forgot i was insane
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "civil-hunger",
"metadata": {},
"source": [
"# Connor's Amazing actor system\n",
"\n",
"I wanted to write a simple actor runtime in python that just builds ontop of asyncio. To give myself a challange i used only data model methods"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "thousand-shepherd",
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"\n",
"from copy import deepcopy\n",
"from queue import Queue, Empty\n",
"from functools import partial\n",
"from threading import Lock"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "sustainable-bench",
"metadata": {},
"outputs": [],
"source": [
"class mail:\n",
" \n",
" def __init__(self, *args, **kwargs):\n",
" self.args = args\n",
" self.kwargs = kwargs\n",
" \n",
" def __getattr__(self, attr):\n",
" return self.kwargs[attr]\n",
" \n",
" def __getitem__(self, item):\n",
" return self.args[item]\n",
"\n",
" \n",
"class mailbox(asyncio.Queue):\n",
" \n",
" def __call__(self, *args, **kwargs):\n",
" return self.put(mail(*args, **kwargs))\n",
" \n",
" def __next__(self):\n",
" return self.get() \n",
" \n",
" async def __anext__(self):\n",
" return await next(self)\n",
"\n",
" def __iter__(self):\n",
" return self\n",
"\n",
" def __aiter__(self):\n",
" return self\n",
" \n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "crucial-commodity",
"metadata": {},
"outputs": [],
"source": [
"class runtime(dict):\n",
" \n",
" def __call__(self, name, func=None):\n",
" if name == Ellipsis: name = f'proc-{len(self)}'\n",
" if func is None: return partial(self, name)\n",
" \n",
" mail = mailbox()\n",
" \n",
" if type(func) != type:\n",
" self[name] = asyncio.create_task(func(mail))\n",
" return mail\n",
"\n",
" elif issubclass(func, actor):\n",
" instance = func(name, self, mail)\n",
" self[name] = asyncio.create_task(instance())\n",
" return instance\n",
" \n",
" raise RuntimeError(f'{func} is not supported by this runtime')\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "separated-emphasis",
"metadata": {},
"outputs": [],
"source": [
"class description:\n",
" def __init__(self, new, *args, **kwargs):\n",
" self.new = new\n",
" self.args = args\n",
" self.kwargs = kwargs\n",
" \n",
" def __call__(self, func):\n",
" if not callable(func):\n",
" raise RuntimeError(f\"{func} is not callable\")\n",
" \n",
" return new(func, *args, **kwargs)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "constant-pursuit",
"metadata": {},
"outputs": [],
"source": [
"class actor:\n",
" initial_state = {}\n",
"\n",
" def __init__(self, name, runtime, mailbox):\n",
" self.lock = asyncio.Lock()\n",
" self.state = self.__class__.initial_state.copy()\n",
" self.name = name\n",
" self.runtime = runtime\n",
" self.mailbox = mailbox\n",
" \n",
" async def __call__(self):\n",
" while mail := await next(self.mailbox):\n",
" await mail[0](*mail.args[1:], **mail.kwargs) \n",
"\n",
" def __getattr__(self, attr):\n",
" action = getattr(self, f'gen_{attr}')\n",
" return lambda *args, **kwargs: self.mailbox(action, *args, **kwargs)\n",
" \n",
" def __getitem__(self, item):\n",
" return self.state.copy()[item]\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "forty-expansion",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[ping] begin\n",
"[pong] ping\n",
"count @ 3\n",
"Counter passed\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n",
"[ping] pong\n",
"[pong] ping\n"
]
}
],
"source": [
"run = runtime()\n",
"num = 0\n",
"\n",
"@run(\"main\")\n",
"async def main(mail):\n",
" await asyncio.sleep(1)\n",
"\n",
" await ping(sender=pong, message='begin')\n",
" await counter.increment()\n",
" await counter.increment(4)\n",
" await counter.decrement(2)\n",
"\n",
" await asyncio.sleep(.1) # small sleep because we parallel\n",
" print(f'count @ {counter[\"num\"]}')\n",
" print('Counter passed')\n",
" \n",
"\n",
"@run(\"ping\")\n",
"async def ping(mailbox):\n",
" while True:\n",
" global num; num += 1\n",
" if num > 10: return\n",
"\n",
" await asyncio.sleep(.4)\n",
" mail = await next(mailbox)\n",
"\n",
" print(f'[ping] {mail.message}')\n",
" await mail.sender(sender=ping, message='ping')\n",
"\n",
"\n",
"@run('pong')\n",
"async def pong(mailbox):\n",
" while True:\n",
" mail = await next(mailbox)\n",
"\n",
" print(f'[pong] {mail.message}')\n",
" await mail.sender(sender=pong, message='pong')\n",
"\n",
"\n",
"\n",
"@run('counter')\n",
"class counter(actor):\n",
" initial_state = { 'num': 0 }\n",
"\n",
" async def gen_increment(self, amount=1):\n",
" self.state['num'] = self.state['num'] + amount\n",
"\n",
" async def gen_decrement(self, amount=1):\n",
" self.state['num'] = self.state['num'] - amount\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "rough-wales",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@navicore
Copy link

beautiful

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment