Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Dead simple asyncio pipeline with random waiting
"""
Example program for asyncio pipeline with varying stage times.
Program takes string as input and converts it to upper case.
"""
import asyncio
import random
from dataclasses import dataclass
@dataclass()
class DataAB:
letter: str
@dataclass()
class DataBC:
letter: str
upper: str
result = ""
async def sleep_about(t: float):
sleep_s = t + 0.5 * random.random()
await asyncio.sleep(sleep_s)
async def do_stepA(queue_out, input):
for letter in input:
await sleep_about(0.1)
print(f'A, sending {letter}')
await queue_out.put(DataAB(letter))
async def do_stepB(queue_in, queue_out):
while True:
data: DataAB = await queue_in.get()
# perform actual step
letter = data.letter
upper = letter.upper()
await sleep_about(1)
print(f'B, processed {upper}')
await queue_out.put(DataBC(letter, upper))
queue_in.task_done()
async def do_stepC(queue_in):
global result
while True:
data: DataBC = await queue_in.get()
# perform actual step
letter = data.letter
upper = data.upper
print(f'C, {letter} changed to {upper}')
result += upper
queue_in.task_done()
async def main():
pipeline_in = 'hello world'
print(f'converting to upper case: {pipeline_in}')
queue_AB = asyncio.Queue()
queue_BC = asyncio.Queue()
stepA = asyncio.create_task(do_stepA(queue_AB, pipeline_in))
stepB = asyncio.create_task(do_stepB(queue_AB, queue_BC))
stepC = asyncio.create_task(do_stepC(queue_BC))
await stepA
print('step A done')
await queue_AB.join()
print('queue A - B done')
stepB.cancel() # no more date is going to show up at B
await queue_BC.join()
print('queue B - C done')
stepC.cancel() # no more date is going to show up at C
print(f'main complete, result: {result}')
asyncio.run(main())
print('program complete')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.