Skip to content

Instantly share code, notes, and snippets.

@nwillems
Created November 24, 2020 13:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nwillems/bb94355d8e0070e1594c4a90a5272e6e to your computer and use it in GitHub Desktop.
Save nwillems/bb94355d8e0070e1594c4a90a5272e6e to your computer and use it in GitHub Desktop.
Python Async Pipelined functions
import functools
import inspect
def asynchronize(fn):
if inspect.iscoroutinefunction(fn):
return fn
@functools.wraps(fn)
async def _wrapper(*args, **kwargs):
return fn(*args, **kwargs)
return _wrapper
async def async_reduce(accumulator, xs, init):
acc = init
for x in xs:
acc = await accumulator(acc, x)
return acc
async def pipeline(fns: List[Callable], start: Any):
async def exec(acc, fn):
w_fn = asynchronize(fn)
r = await w_fn(acc)
return r
r = await async_reduce(exec, fns, start)
return r
import asyncio
import pipeline
def foo(x: int) -> int:
return x*2
async def bar(x2: int) -> int:
await asyncio.sleep(x2)
return x2
async def main():
result = pipeline.pipeline([foo, bar], 10)
println(result)
# expected results:
# After a wait of 20secs
# console: 20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment