Skip to content

Instantly share code, notes, and snippets.

@Contextualist
Created June 17, 2020 15:46
Show Gist options
  • Save Contextualist/b12d696fec857d7b82429b7bf55963cd to your computer and use it in GitHub Desktop.
Save Contextualist/b12d696fec857d7b82429b7bf55963cd to your computer and use it in GitHub Desktop.
Grain head alpha (trio, nng)
import trio
from pynng import Pair0
import dill as pickle
from copy import deepcopy
from .util import timeblock
class GrainRemote(object):
def __init__(self, addr, res):
self._c = Pair0(dial=f"tcp://{addr}:4242")
self.res = res
self.name = addr
async def execf(self, tid, fn, *args, **kwargs):
await self._c.asend(pickle.dumps((tid, fn, args, kwargs)))
tid2, r2 = pickle.loads(await self._c.arecv()) # Not neccessary the matching response
if tid2 == -1:
print(f"Remote {self.name}'s task {r2[0]!r} failed with exception:\n{r2[1]}")
raise RuntimeError("Remote exception")
return tid2, r2
async def done(self):
await self._c.asend(b"FIN")
self._c.close()
class GrainPseudoRemote(object):
def __init__(self, res):
self.res = res
self.name = "local"
async def execf(self, tid, fn, *args, **kwargs):
return tid, await fn(*args, **kwargs)
async def done(self):
pass
class GrainExecutor(object):
def __init__(self, waddrs, rpw):
self.jobq = []
self.retq = []
self.results = None
self.pool = [GrainRemote(a, deepcopy(rpw)) for a in waddrs]
self.pool.append(GrainPseudoRemote(deepcopy(rpw)))
self.hold = {}
self.push_res, self.pull_res = trio.open_memory_channel(128)
def submit(self, res, fn, *args, **kwargs):
self.jobq.append((res, fn, args, kwargs))
async def __schedule(self, res): # naive scheduling alg: greedy
while True:
for w in self.pool:
if w.res >= res:
return w.res.alloc(res), w
await self.pull_res.receive()
async def __task_with_res(self, tid, res, w, fn, args, kwargs):
self.hold[tid] = res
tid2, r2 = await w.execf(tid, fn, grain_res=res, *args, **kwargs) # Not neccessary the matching response
w.res.dealloc(self.hold[tid2])
self.push_res.send_nowait(None)
self.retq[tid2] = r2
async def run(self):
await trio.sleep(5) # wait for workers TODO
try:
with timeblock("all jobs"):
self.retq = [None] * len(self.jobq)
async with trio.open_nursery() as _n:
for i, (res, *fa) in enumerate(self.jobq):
res, w = await self.__schedule(res)
_n.start_soon(self.__task_with_res, i, res, w, *fa)
finally:
for w in self.pool:
await w.done()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type:
return False
trio.run(self.run)
self.results = self.retq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment