Created
October 11, 2023 09:07
-
-
Save santibreo/84637248aeaf2133ef1f5ec9498ceda1 to your computer and use it in GitHub Desktop.
Python pipelines
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This pipeline waits for each step to finish completely (all | |
# the step arguments have to be processed) before running the next | |
# step | |
from functools import reduce | |
import time | |
def compose(*funcs): | |
return reduce(lambda f, g: lambda *x: g(*f(*x)), funcs) | |
def f(*us): | |
for i, u in enumerate(us, start=1): | |
time.sleep(i) | |
yield u | |
def g(*us): | |
for u in us: | |
yield 2*u | |
time.sleep(1) | |
yield 6*u | |
start = time.time() | |
for value in compose(f, g)(1,2,3,4): | |
print(f"Time: {(time.time() - start):.6f}s, Value: {value}") | |
# Expected output: | |
# | |
# Time: 10.006447s, Value: 2 | |
# Time: 11.007720s, Value: 6 | |
# Time: 11.007829s, Value: 4 | |
# Time: 12.009061s, Value: 12 | |
# Time: 12.009185s, Value: 6 | |
# Time: 13.010283s, Value: 18 | |
# Time: 13.010399s, Value: 8 | |
# Time: 14.011632s, Value: 24 | |
# ----------------------------------------------------------------------------- | |
# This pipeline waits for each element to go through the whole pipeline | |
# (as soon as an element is available next step starts processing it) | |
# before running the whole pipeline for the next element | |
from functools import reduce, wraps | |
import time | |
def compose(*funcs): | |
def reduce_function(func_ini, func_end): | |
def resulting_function(*args): | |
for arg in args: | |
for func_ini_result in func_ini(arg): | |
yield from func_end(func_ini_result) | |
return resulting_function | |
return reduce(reduce_function, funcs) | |
def f(*us): | |
for i, u in enumerate(us, start=1): | |
time.sleep(i) | |
yield u | |
def g(*us): | |
for u in us: | |
yield 2*u | |
time.sleep(1) | |
yield 6*u | |
start = time.time() | |
for value in compose(f, g)(1,2,3,4): | |
print(f"Time: {(time.time() - start):.8f}s, Value: {value}") | |
# Expected output: | |
# | |
# Time: 1.00128961s, Value: 2 | |
# Time: 2.00258803s, Value: 6 | |
# Time: 3.00395775s, Value: 4 | |
# Time: 4.00522828s, Value: 12 | |
# Time: 5.00629807s, Value: 6 | |
# Time: 6.00751209s, Value: 18 | |
# Time: 7.00876856s, Value: 8 | |
# Time: 8.00997591s, Value: 24 | |
# Time improvement: -43% | |
# ----------------------------------------------------------------------------- | |
# In this pipeline each step starts processing new argument as soon as | |
# it has finished processing previous one (and new one is available) | |
from functools import reduce | |
import time | |
import asyncio | |
class AsyncIterator(): | |
"""Async iteration through regular Python sequences""" | |
# constructor, define some state | |
def __init__(self, sequence): | |
self._index = -1 | |
self.sequence = sequence | |
# create an instance of the iterator | |
def __aiter__(self): | |
return self | |
# return the next awaitable | |
async def __anext__(self): | |
self._index += 1 | |
# check for no further items | |
if self._index >= len(self.sequence): | |
raise StopAsyncIteration | |
# return the current item | |
return self.sequence[self._index] | |
async def aenumerate(asequence, start=0): | |
"""Asynchronously enumerate an async iterator from a given start value""" | |
n = start | |
async for elem in asequence: | |
yield n, elem | |
n += 1 | |
async def compose(*funcs): | |
"""Async pipeline factory version, chains asynchronous functions""" | |
async def reduce_function(func_ini, func_end): | |
async def resulting_function(*args): | |
for arg in args: | |
async for coro_ini in func_ini(arg): | |
async for coro_end in func_end(coro_ini): | |
yield coro_end | |
return resulting_function | |
func_ini = funcs[0] | |
for func_end in funcs[1:]: | |
func_ini = await reduce_function(func_ini, func_end) | |
return func_ini | |
async def f(us): | |
"""Gets a normal sequence and yields coroutines""" | |
async def process_element(i, u): | |
await asyncio.sleep(i) | |
return u | |
async for i, u in aenumerate(us, start=1): | |
yield process_element(i, u) | |
async def g(*us): | |
"""Gets coroutines and yields coroutines""" | |
async def process_element_a(u): | |
return 2 * await u | |
async def process_element_b(u): | |
await asyncio.sleep(1) | |
return 6 * await u | |
async for u in AsyncIterator(us): | |
task = asyncio.create_task(u) | |
yield process_element_a(task) | |
yield process_element_b(task) | |
start = time.time() | |
tasks = [] | |
pipeline = await compose(f, g) | |
# First collect all coroutines (NO execution) | |
async for task in pipeline(AsyncIterator((1,2,3,4))): | |
tasks.append(task) | |
# Execute collected coroutines ordered | |
async for task in AsyncIterator(tasks): | |
value = await(task) | |
print(f"Time: {(time.time() - start):.8f}s, Value: {value}") | |
# Expected output: | |
# | |
# Time: 1.00156617s, Value: 2 | |
# Time: 2.00219393s, Value: 6 | |
# Time: 2.00231194s, Value: 4 | |
# Time: 3.00313711s, Value: 12 | |
# Time: 3.00326014s, Value: 6 | |
# Time: 4.00427079s, Value: 18 | |
# Time: 4.00439000s, Value: 8 | |
# Time: 5.00583577s, Value: 24 | |
# Time improvement: -64% |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment