Dead simple asyncio pipeline with second stage served by 3 tasks
""" | |
Example program for asyncio pipeline with concurrent stages. | |
Program takes string as input and converts it to upper case. | |
An overview of the stages: | |
step B-1 | |
step A -> step B-2 -> step C | |
step B-3 | |
""" | |
import asyncio | |
import random | |
from dataclasses import dataclass | |
@dataclass() | |
class DataAB: | |
letter: str | |
@dataclass() | |
class DataBC: | |
letter: str | |
upper: str | |
result = "" | |
async def sleep_about(t: float): | |
sleep_s = t + 0.5 * random.random() | |
await asyncio.sleep(sleep_s) | |
async def do_stepA(queue_out, input): | |
for letter in input: | |
print(f'A, sending {letter}') | |
await sleep_about(0.1) | |
await queue_out.put(DataAB(letter)) | |
async def do_stepB(queue_in, queue_out, id): | |
while True: | |
data: DataAB = await queue_in.get() | |
# perform actual step | |
letter = data.letter | |
upper = letter.upper() | |
await sleep_about(1) | |
print(f'B {id}, processed {upper}') | |
await queue_out.put(DataBC(letter, upper)) | |
queue_in.task_done() | |
async def do_stepC(queue_in): | |
global result | |
while True: | |
data: DataBC = await queue_in.get() | |
# perform actual step | |
letter = data.letter | |
upper = data.upper | |
print(f'C, {letter} changed to {upper}') | |
result += upper | |
queue_in.task_done() | |
async def main(): | |
pipeline_in = 'hello world' | |
print(f'converting to upper case: {pipeline_in}') | |
queue_AB = asyncio.Queue() | |
queue_BC = asyncio.Queue() | |
stepA = asyncio.create_task(do_stepA(queue_AB, pipeline_in)) | |
stepsB = [ | |
asyncio.create_task(do_stepB(queue_AB, queue_BC, 1)), | |
asyncio.create_task(do_stepB(queue_AB, queue_BC, 2)), | |
asyncio.create_task(do_stepB(queue_AB, queue_BC, 3)), | |
] | |
stepC = asyncio.create_task(do_stepC(queue_BC)) | |
await stepA | |
print('step A done') | |
await queue_AB.join() | |
print('queue A - B done') | |
for step in stepsB: | |
step.cancel() # no more date is going to show up at B | |
await queue_BC.join() | |
print('queue B - C done') | |
stepC.cancel() # no more date is going to show up at C | |
print(f'main complete, result: {result}') | |
asyncio.run(main()) | |
print('program complete') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment