Skip to content

Instantly share code, notes, and snippets.

@santibreo
Created October 11, 2023 09:07
Show Gist options
  • Save santibreo/84637248aeaf2133ef1f5ec9498ceda1 to your computer and use it in GitHub Desktop.
Save santibreo/84637248aeaf2133ef1f5ec9498ceda1 to your computer and use it in GitHub Desktop.
Python pipelines
# This pipeline waits for each step to finish completely (all
# the step arguments have to be processed) before running the next
# step
from functools import reduce
import time
def compose(*funcs):
return reduce(lambda f, g: lambda *x: g(*f(*x)), funcs)
def f(*us):
for i, u in enumerate(us, start=1):
time.sleep(i)
yield u
def g(*us):
for u in us:
yield 2*u
time.sleep(1)
yield 6*u
start = time.time()
for value in compose(f, g)(1,2,3,4):
print(f"Time: {(time.time() - start):.6f}s, Value: {value}")
# Expected output:
#
# Time: 10.006447s, Value: 2
# Time: 11.007720s, Value: 6
# Time: 11.007829s, Value: 4
# Time: 12.009061s, Value: 12
# Time: 12.009185s, Value: 6
# Time: 13.010283s, Value: 18
# Time: 13.010399s, Value: 8
# Time: 14.011632s, Value: 24
# -----------------------------------------------------------------------------
# This pipeline waits for each element to go through the whole pipeline
# (as soon as an element is available next step starts processing it)
# before running the whole pipeline for the next element
from functools import reduce, wraps
import time
def compose(*funcs):
def reduce_function(func_ini, func_end):
def resulting_function(*args):
for arg in args:
for func_ini_result in func_ini(arg):
yield from func_end(func_ini_result)
return resulting_function
return reduce(reduce_function, funcs)
def f(*us):
for i, u in enumerate(us, start=1):
time.sleep(i)
yield u
def g(*us):
for u in us:
yield 2*u
time.sleep(1)
yield 6*u
start = time.time()
for value in compose(f, g)(1,2,3,4):
print(f"Time: {(time.time() - start):.8f}s, Value: {value}")
# Expected output:
#
# Time: 1.00128961s, Value: 2
# Time: 2.00258803s, Value: 6
# Time: 3.00395775s, Value: 4
# Time: 4.00522828s, Value: 12
# Time: 5.00629807s, Value: 6
# Time: 6.00751209s, Value: 18
# Time: 7.00876856s, Value: 8
# Time: 8.00997591s, Value: 24
# Time improvement: -43%
# -----------------------------------------------------------------------------
# In this pipeline each step starts processing new argument as soon as
# it has finished processing previous one (and new one is available)
from functools import reduce
import time
import asyncio
class AsyncIterator():
"""Async iteration through regular Python sequences"""
# constructor, define some state
def __init__(self, sequence):
self._index = -1
self.sequence = sequence
# create an instance of the iterator
def __aiter__(self):
return self
# return the next awaitable
async def __anext__(self):
self._index += 1
# check for no further items
if self._index >= len(self.sequence):
raise StopAsyncIteration
# return the current item
return self.sequence[self._index]
async def aenumerate(asequence, start=0):
"""Asynchronously enumerate an async iterator from a given start value"""
n = start
async for elem in asequence:
yield n, elem
n += 1
async def compose(*funcs):
"""Async pipeline factory version, chains asynchronous functions"""
async def reduce_function(func_ini, func_end):
async def resulting_function(*args):
for arg in args:
async for coro_ini in func_ini(arg):
async for coro_end in func_end(coro_ini):
yield coro_end
return resulting_function
func_ini = funcs[0]
for func_end in funcs[1:]:
func_ini = await reduce_function(func_ini, func_end)
return func_ini
async def f(us):
"""Gets a normal sequence and yields coroutines"""
async def process_element(i, u):
await asyncio.sleep(i)
return u
async for i, u in aenumerate(us, start=1):
yield process_element(i, u)
async def g(*us):
"""Gets coroutines and yields coroutines"""
async def process_element_a(u):
return 2 * await u
async def process_element_b(u):
await asyncio.sleep(1)
return 6 * await u
async for u in AsyncIterator(us):
task = asyncio.create_task(u)
yield process_element_a(task)
yield process_element_b(task)
start = time.time()
tasks = []
pipeline = await compose(f, g)
# First collect all coroutines (NO execution)
async for task in pipeline(AsyncIterator((1,2,3,4))):
tasks.append(task)
# Execute collected coroutines ordered
async for task in AsyncIterator(tasks):
value = await(task)
print(f"Time: {(time.time() - start):.8f}s, Value: {value}")
# Expected output:
#
# Time: 1.00156617s, Value: 2
# Time: 2.00219393s, Value: 6
# Time: 2.00231194s, Value: 4
# Time: 3.00313711s, Value: 12
# Time: 3.00326014s, Value: 6
# Time: 4.00427079s, Value: 18
# Time: 4.00439000s, Value: 8
# Time: 5.00583577s, Value: 24
# Time improvement: -64%
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment