Skip to content

Instantly share code, notes, and snippets.

@naufalafif
Last active September 12, 2021 04:13
Show Gist options
  • Save naufalafif/68727e4db391fcf349ead09e3403c405 to your computer and use it in GitHub Desktop.
Save naufalafif/68727e4db391fcf349ead09e3403c405 to your computer and use it in GitHub Desktop.
Asyncronous Pipe - Python
import asyncio
async def async_pipe(init_value, *callables):
"""
pipe for async function. work similar like toolz.pipe
:parameter init_value: value or invoked function/async function
:parameter callable: callable functions
"""
if len(callables) > 0:
if not all([callable(cal) for cal in callables]):
raise ValueError("invalid value, must be callable")
else:
if asyncio.iscoroutine(init_value):
return await init_value
else:
return init_value
current_value = await init_value if asyncio.iscoroutine(init_value) else init_value
for cal in callables:
raw_value = cal(current_value)
current_value = await raw_value if asyncio.iscoroutine(raw_value) else raw_value
return current_value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment