Skip to content

Instantly share code, notes, and snippets.

@mgxd
Last active May 21, 2019 19:38
Show Gist options
  • Save mgxd/9a00cba44ece7c92deccb8f88ae6cdea to your computer and use it in GitHub Desktop.
Save mgxd/9a00cba44ece7c92deccb8f88ae6cdea to your computer and use it in GitHub Desktop.
asyncio_pydra.py
import asyncio
import concurrent.futures as cf
# from multiprocessing.context import ForkServerContext
import time
import functools
# used to bypass Event Loop error in Jupyter Notebooks
import nest_asyncio
nest_asyncio.apply()
# Dependencies for :class:`Node`
################################
# stripped down example
# requires Node, Workflow classes
class Inputs():
def __init__(self):
self.val = None
def __iter__(self):
yield self.val
class Outputs(list):
def __init__(self, *args):
self.append('out')
for arg in args:
self.append(arg)
class Interface():
def __init__(self, inputs, outputs):
self.inputs = inputs
self.outputs = {o: None for o in outputs}
self._results = {}
@property
def results(self):
return self._results
def run(self):
# sim some computing time...
if self.inputs.val == 3:
time.sleep(5)
time.sleep(5)
# just add one to result
plusone = self.inputs.val + 1
self._results['out'] = plusone
return self.results
class Node():
def __init__(self, name, ikws=None, oargs=None):
self.name = name
self.inputs = Inputs()
self.outputs = Outputs()
if ikws:
self.inputs = Inputs(**ikws)
if oargs:
self.outputs = Outputs(*oargs)
self.interface = Interface(self.inputs, self.outputs)
self._results = {}
self._dependencies = []
self.done = False
def __repr__(self):
return self.name
def _is_runnable(self):
if not self._dependencies:
return True
return all([node.done for node in self._dependencies])
def run(self):
print("NODE %s: running" % self.name)
# check if any inputs are futures - if so, wait!
# wait til it is available
# cannot run if missing input!
self._results = self.interface.run()
print("NODE %s: completed" % self.name)
self.done = True
print(self._results)
return self._results
# @property
# def done(self):
# if self._results:
# return True
@property
def results(self):
return self._results
class Workflow():
"""
Basic DAG with asyncio/cf execution by default.
Currently does not have a sense of topological sorting,
so nodes should be added in order of execution.
Example:
... wf = Workflow()
... a = Node('a')
... b = Node('b')
... wf.connect(a, b)
... wf.run() # or wf()
"""
graph = {} # should be ordered
@property
def size(self):
# connected nodes
return len(self.graph.keys())
def __init__(self):
# ensure empty on initialization
self.clear()
self._loop = asyncio.get_event_loop()
def __call__(self):
self.run()
def add(self, node):
"""Add node to workflow"""
self.graph[node] = set()
def connect(self, n1, n2):
"""Add an edge to the graph between Nodes n1 to n2"""
if not self.graph.get(n1):
self.add(n1)
if not self.graph.get(n2):
self.add(n2)
elif n2 in self.graph[n1]:
raise AttributeError('%s already connected to %s', n1, n2)
self.graph[n1].add(n2)
#n2._is_runnable = False
n2._dependencies.append(n1)
def clear(self):
"""Reset graph"""
self.graph = {}
def topsort(self):
pass
def _set_needed_outputs(self, node):
"""Helper method for downstream node variable assignment
TODO: replace val with node's results
"""
for target in self.graph[node]:
print("Assigning Node %s's input to %d" % (target, node._results['out']))
# update downstream nodes
target.inputs.val = node._results['out']
#############################################################
####################### Execution 1 #########################
#############################################################
# # Wrapping corouting which waits for return from process pool.
# async def get_results(self, executor, node):
# """Wait for node results"""
# # trigger the run
# loop = asyncio.get_event_loop()
# res = await loop.run_in_executor(executor, node.run)
# return node, res
# # asyncio Task (coroutine in eventloop)
# async def _run_wf(self, executor):
# """
# Schedule run in event loop.
# """
# # create the process pool
# with executor as wf_executor:
# print("Workflow contains %d nodes" % self.size, end="\n*******\n")
# # this method submits in chunks
# graph = set(self.graph)
# while graph:
# # Calling the asyncio coroutine and get a future of node results
# futures = [self.get_results(wf_executor, node) for node
# in graph if node._is_runnable]
# for fut in asyncio.as_completed(futures, timeout=12):
# node, val = await fut
# print("Completed", node)
# graph.discard(node)
# for target in self.graph[node]:
# print("Assigning Node %s's input to %d" % (target, val['out']))
# # update downstream nodes
# target.inputs.val = val['out']
# # assumes only 1 connection, needs to be smarter
# target._is_runnable = True
# def run(self, executor=None):
# """Run the future until completed"""
# # for now, just run concurrently on local CPUs
# if not executor:
# executor = cf.ProcessPoolExecutor()
# asyncio.run(self._run_wf(executor))
#############################################################
####################### Execution 2 #########################
#############################################################
# Wrapping corouting which waits for return from process pool.
async def get_results(self, executor, node):
"""Wait for node results"""
# trigger the node to run
fut = await self._loop.run_in_executor(executor, node.run)
return node, fut
# asyncio Task (coroutine in eventloop)
async def _run_wf(self, executor):
"""
Schedule run in event loop.
"""
# create the process pool
print("Workflow contains %d nodes" % self.size, end="\n*******\n")
remaining = set(self.graph)
pending = set()
tasks = set()
with executor as wf_executor:
loops = 0
while remaining or pending:
# check number of cycles / pending nodes
# loops += 1
# print(loops, remaining)
toqueue = [n for n in remaining if n._is_runnable()]
for node in toqueue:
tasks.add(asyncio.create_task(
self.get_results(wf_executor, node)
))
# remove it once queued
remaining.discard(node)
# combine past pending with futures
tasks.union(pending)
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for fut in done:
tasks.discard(fut)
pending.discard(fut)
node, val = await fut
node.done = True
node._results = val
self._set_needed_outputs(node)
# if not done and not pending:
# # escape
# break
def run(self, executor=None):
"""Run the future until completed"""
# for now, just run concurrently on local CPUs
if not executor:
executor = cf.ProcessPoolExecutor()
asyncio.run(self._run_wf(executor))
def test_linear():
"""
One linear workflow
A -> B -> C
"""
a = Node('a')
a.inputs.val = 1
b = Node('b')
c = Node('c')
wf = Workflow()
wf.connect(a, b)
wf.connect(b, c)
wf.run()
def test_concurrent():
"""
Parallel workflow - [B, C] should execute concurrently
A -> B --> D
\-> C
"""
a = Node('a')
a.inputs.val = 1
b = Node('b')
c = Node('c')
d = Node('d')
wf = Workflow()
wf.connect(a, b)
wf.connect(a, c)
wf.connect(b, d)
wf()
def test_combination():
"""
Half concurrent workflow - [A/B] should execute concurrently,
then join at C and linearly proceed.
A
\
C --> D
/
B
"""
a = Node('a')
a.inputs.val = 3
b = Node('b')
b.inputs.val = 5
c = Node('c')
d = Node('d')
wf = Workflow()
wf.connect(a, c)
wf.connect(b, c)
wf.connect(c, d)
wf.run()
import asyncio
import concurrent.futures as cf
import time
from functools import partial
class Submitter:
def __init__(self):
self.loop = None
self.pool = cf.ProcessPoolExecutor()
self.remaining = []
self.futures = set()
async def submit_job(self, runnable):
if isinstance(runnable, Workflow):
newsubmitter = Submitter()
newsubmitter.loop = self.loop
res = await newsubmitter.run(runnable)
else:
res = await self.loop.run_in_executor(
self.pool, runnable.run
)
return runnable, res
async def fetch_jobs(self, runnable=None):
done = []
try:
done, pending = await asyncio.wait(
self.futures, return_when=asyncio.FIRST_COMPLETED
)
except ValueError:
# nothing pending!
pending = set()
# preserve pending tasks
for fut in done:
task, res = await fut
print(f"{task.name} completed at {time.time()}")
self.futures.remove(fut)
if not isinstance(task, Workflow):
task.outfield = res
runnable.assign_next(task, res)
return runnable
async def run(self, runnable):
maxloops = 10
count = 0
while not runnable.done:
jobs = [j for j in runnable.graph if j.ready]
for job in jobs:
print(f"submitting {job.name}")
job_future = asyncio.create_task(self.submit_job(job))
self.futures.add(job_future)
await self.fetch_jobs(runnable)
count += 1
if count > maxloops:
breakpoint()
if runnable.outfield is None:
print("is none!!")
runnable.outfield = 1
return runnable.outfield
###############################################
# Entrypoints
###############################################
def submit(self, runnable):
"""
Entrypoint for task/workflow submission
Anything spawned after will be submitted as a task
through ``submit_existing``
"""
if not self.loop:
self.loop = asyncio.get_event_loop()
print(f"Starting {runnable}")
self.loop.run_until_complete(self.run(runnable))
print(f"Completed {runnable} at {time.time()}")
class Task:
def __init__(self, name, x=None):
self.name = name
self.infield = x
self._outfield = None
def __repr__(self):
return self.name
@property
def outfield(self):
return self._outfield
@outfield.setter
def outfield(self, val):
self._outfield = val
@property
def done(self):
return self.outfield is not None
@property
def ready(self):
if self.done:
return False
return self.infield is not None
def run(self, submitter=None):
time.sleep(1)
self.outfield = self.infield + 1
return self.outfield
class Workflow(Task):
def __init__(self, graph, name, x=None):
self.graph = graph
self.name = name
self.infield = x
@property
def done(self):
for task in self.graph:
if not task.done:
return False
return True
@property
def outfield(self):
return [*self.graph][-1].outfield
@property
def ready(self):
if self.done:
return False
for task in self.graph:
return True if task.ready else False
return False
def assign_next(self, task, res):
for targ in self.graph[task]:
if isinstance(targ, Workflow):
# assign first node
targ = [*targ.graph][0]
print(f"Assigning {res} to {targ.name} infield")
targ.infield = res
def run(self, submitter=None):
if not submitter:
submitter = Submitter()
submitter.submit(self)
self.outfield = 'done'
return self.outfield
def wf1():
"""Linear workflow with a set of concurrent nodes and a subworkflow"""
t1 = Task('first', 1)
t2 = Task('second', 2)
t3 = Task('third')
t6 = Task('last')
t4 = Task('sub-fourth')
t5 = Task('sub-fifth')
graph1 = {
t4: [t5],
}
swf = Workflow(graph1, 'subwf')
graph2 = {
t1: [t2],
t2: [t3],
t3: [swf],
swf: [t6],
t6: [],
}
wf = Workflow(graph2, 'workflow')
return wf
def wf2():
"""Concurrent workflows"""
t1 = Task("wf_a1", 1)
t2 = Task("wf_a2")
graph1 = {
t1: [t2],
t2:[]
}
wf_a = Workflow(graph1, 'wfa')
wf_b = wf1()
graph = {
wf_a: [],
wf_b: []
}
wf = Workflow(graph, 'wf')
return wf
if __name__ == "__main__":
wf = wf2()
sub = Submitter()
sub.submit(wf)
@satra
Copy link

satra commented May 21, 2019

import asyncio
import concurrent.futures as cf
import time
from functools import partial


class Submitter:
    def __init__(self):
        self.loop = None
        self.pool = cf.ProcessPoolExecutor()
        self.remaining = []
        self.futures = set()
    
    async def submit_job(self, runnable):
        if isinstance(runnable, Workflow):
            newsubmitter = Submitter()
            newsubmitter.loop = self.loop
            res = await newsubmitter.run(runnable)
        else:
            res = await self.loop.run_in_executor(
                self.pool, runnable.run
                )
        return runnable, res

    async def fetch_jobs(self, runnable=None):
        done = []
        try:
            done, pending = await asyncio.wait(
                self.futures, return_when=asyncio.FIRST_COMPLETED
            )
        except ValueError:
            # nothing pending!
            pending = set()
        # preserve pending tasks
        for fut in done:
            task, res = await fut
            print(f"{task.name} completed")
            self.futures.remove(fut)
            task.outfield = res
            runnable.assign_next(task, res)
        return runnable
    
    async def run(self, runnable):
        maxloops = 10
        count = 0
        while not runnable.done:
            jobs = [j for j in runnable.graph if j.ready]
            for job in jobs:
                print(f"submitting {job.name}")
                job_future = asyncio.create_task(self.submit_job(job))
                self.futures.add(job_future)
            await self.fetch_jobs(runnable)
            count += 1
            if count > maxloops:
                breakpoint()
        if runnable.outfield is None:
            runnable.outfield = 1
        return runnable.outfield
    
    ###############################################
    # Entrypoints
    ###############################################
    def submit(self, runnable):
        """
        Entrypoint for task/workflow submission
        Anything spawned after will be submitted as a task 
        through ``submit_existing``
        """
        if not self.loop:
            self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self.run(runnable))
        

class Task:
    def __init__(self, name, x=None):
        self.name = name
        self.infield = x
        self.outfield = None
    
    @property
    def done(self):
        return self.outfield is not None
    
    @property
    def ready(self):
        if self.done:
            return False
        return self.infield is not None

    def run(self, submitter=None):
        time.sleep(1)
        self.outfield = self.infield + 1
        return self.outfield


class Workflow(Task):
    def __init__(self, graph, name, x=None):
        super(Workflow, self).__init__(name, x)
        self.graph = graph

    @property
    def done(self):
        for task in self.graph:
            if not task.done:
                return False
        return True
    
    @property
    def ready(self):
        if self.done:
            return False
        for task in self.graph:
            return True if task.ready else False
        return False
    
    def assign_next(self, task, res):
        for targ in self.graph[task]:
            if isinstance(targ, Workflow):
                # assign first node
                targ = [*targ.graph][0]
            print(f"Assigning {res} to {targ.name} infield")
            targ.infield = res

    def run(self, submitter=None):
        if not submitter:
            submitter = Submitter()
            
        submitter.submit(self)
        self.outfield = 'done'
        return self.outfield

if __name__ == "__main__":
    t1 = Task('first', 1)
    t2 = Task('second')
    t3 = Task('third')
    t6 = Task('last')

    t4 = Task('sub-fourth')
    t5 = Task('sub-fifth')
    graph1 = {
        t4: [t5],
    }
    swf = Workflow(graph1, 'subwf')

    graph2 = {
        t1: [t2],
        t2: [t3],
        t3: [swf],
        swf: [t6],
        t6: [],
    }

    wf = Workflow(graph2, 'workflow')
    sub = Submitter()

    sub.submit(wf)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment