Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Dead simple asyncio pipeline with second stage served by 3 tasks
"""
Example program for asyncio pipeline with concurrent stages.
Program takes string as input and converts it to upper case.
An overview of the stages:
step B-1
step A -> step B-2 -> step C
step B-3
"""
import asyncio
import random
from dataclasses import dataclass
@dataclass()
class DataAB:
letter: str
@dataclass()
class DataBC:
letter: str
upper: str
result = ""
async def sleep_about(t: float):
sleep_s = t + 0.5 * random.random()
await asyncio.sleep(sleep_s)
async def do_stepA(queue_out, input):
for letter in input:
print(f'A, sending {letter}')
await sleep_about(0.1)
await queue_out.put(DataAB(letter))
async def do_stepB(queue_in, queue_out, id):
while True:
data: DataAB = await queue_in.get()
# perform actual step
letter = data.letter
upper = letter.upper()
await sleep_about(1)
print(f'B {id}, processed {upper}')
await queue_out.put(DataBC(letter, upper))
queue_in.task_done()
async def do_stepC(queue_in):
global result
while True:
data: DataBC = await queue_in.get()
# perform actual step
letter = data.letter
upper = data.upper
print(f'C, {letter} changed to {upper}')
result += upper
queue_in.task_done()
async def main():
pipeline_in = 'hello world'
print(f'converting to upper case: {pipeline_in}')
queue_AB = asyncio.Queue()
queue_BC = asyncio.Queue()
stepA = asyncio.create_task(do_stepA(queue_AB, pipeline_in))
stepsB = [
asyncio.create_task(do_stepB(queue_AB, queue_BC, 1)),
asyncio.create_task(do_stepB(queue_AB, queue_BC, 2)),
asyncio.create_task(do_stepB(queue_AB, queue_BC, 3)),
]
stepC = asyncio.create_task(do_stepC(queue_BC))
await stepA
print('step A done')
await queue_AB.join()
print('queue A - B done')
for step in stepsB:
step.cancel() # no more date is going to show up at B
await queue_BC.join()
print('queue B - C done')
stepC.cancel() # no more date is going to show up at C
print(f'main complete, result: {result}')
asyncio.run(main())
print('program complete')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment