Skip to content

Instantly share code, notes, and snippets.

@cwells
Last active October 11, 2020 05:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cwells/5a99d576800031018530b71385d5d75c to your computer and use it in GitHub Desktop.
Save cwells/5a99d576800031018530b71385d5d75c to your computer and use it in GitHub Desktop.
Execution of topologically-sorted functions and methods
#!/bin/env python
import json
import time
from collections import ChainMap
from functools import partial
import click
from taskgraph import TaskGraph
task = TaskGraph()
class Job:
def __init__(self):
self.start_time = time.time()
def do_work(self, delay):
time.sleep(delay)
return round(time.time() - self.start_time, 1)
@task.requires()
def foo(self, delay):
return { "foo": self.do_work(delay) }
@task.requires()
def bar(self, delay):
return { "bar": self.do_work(delay) }
@task.requires(bar)
def baz(self, delay):
return { "baz": self.do_work(delay) }
@task.requires(foo, bar)
def qux(self, delay):
return { "qux": self.do_work(delay) }
@task.requires(qux, baz)
def quz(self, delay):
return { "quz": self.do_work(delay) }
@task.requires(foo, bar)
def xyzzy(self, delay):
return { "xyzzy": self.do_work(delay) }
@click.command()
@click.option('--parallel', '-p', is_flag=True, help="Run tasks in parallel.")
@click.option('--pool-size', '-z', type=click.IntRange(1, 8), default=4, help="Size of pool for parallel processing.")
@click.option('--delay', '-d', type=click.IntRange(0, 10), default=0, help="Seconds to sleep in functions.")
@click.option('--graph', '-g', is_flag=True, help="Dump graph to stdout and exit (affected by -p flag).")
def main(parallel, pool_size, delay, graph):
if graph:
print(task.graph(parallel))
raise SystemExit
job = Job()
run = partial(task.run_parallel, pool_size=pool_size) if parallel else task.run
record = dict(ChainMap(*run(job, delay=delay)))
print(json.dumps(record, indent=2))
if __name__ == '__main__':
main()
click==7.1.2
toposort==1.5
#!/bin/env python
from multiprocessing import Pool
from toposort import toposort, toposort_flatten
class TaskGraph:
'''Task dependency tree.
Tasks may be run serially or in parallel.
Declare tasks using @TaskGraph.requires().
'''
def __init__(self):
self._graph = {}
self._tasks = {}
def requires(self, *deps):
'''Decorator for declaring a function as a task as well as listing
other tasks as dependencies.
'''
def wrapper(fn):
self._graph[fn.__name__] = set(f.__name__ for f in deps or [])
self._tasks[fn.__name__] = fn
return fn
return wrapper
def graph(self, parallel=False):
'''Return formatted graph representation as string.
'''
if parallel:
return "(" + ") -> (".join(', '.join(s) for s in toposort(self._graph)) + ")"
return ' -> '.join([ s for s in toposort_flatten(self._graph) ])
def run(self, *args, **kwargs):
'''Run tasks serially.
Useful when:
- tasks share mutable state
- tasks are too small to be worth running in parallel
'''
for task in toposort_flatten(self._graph):
yield self._tasks[task](*args, **kwargs)
def run_parallel(self, *args, pool_size=4, **kwargs):
'''Run independent tasks in parallel.
For example, given dependencies a -> b and c -> d, (a, c) would be run
in parallel, followed by (b, d) being run in parallel.
'''
for tasks in toposort(self._graph):
with Pool(processes=pool_size) as pool:
results = []
for task in tasks:
results.append(pool.apply_async(self._tasks[task], args, kwargs))
for result in results:
yield result.get()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment