Skip to content

Instantly share code, notes, and snippets.

@RRMoelker
Last active July 24, 2019 19:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RRMoelker/58146e8e0b9e456db6a267c9c4280a05 to your computer and use it in GitHub Desktop.
Save RRMoelker/58146e8e0b9e456db6a267c9c4280a05 to your computer and use it in GitHub Desktop.
Dead simple asyncio pipeline with random waiting
"""
Example program for asyncio pipeline with varying stage times.
Program takes string as input and converts it to upper case.
"""
import asyncio
import random
from dataclasses import dataclass
@dataclass()
class DataAB:
letter: str
@dataclass()
class DataBC:
letter: str
upper: str
result = ""
async def sleep_about(t: float):
sleep_s = t + 0.5 * random.random()
await asyncio.sleep(sleep_s)
async def do_stepA(queue_out, input):
for letter in input:
await sleep_about(0.1)
print(f'A, sending {letter}')
await queue_out.put(DataAB(letter))
async def do_stepB(queue_in, queue_out):
while True:
data: DataAB = await queue_in.get()
# perform actual step
letter = data.letter
upper = letter.upper()
await sleep_about(1)
print(f'B, processed {upper}')
await queue_out.put(DataBC(letter, upper))
queue_in.task_done()
async def do_stepC(queue_in):
global result
while True:
data: DataBC = await queue_in.get()
# perform actual step
letter = data.letter
upper = data.upper
print(f'C, {letter} changed to {upper}')
result += upper
queue_in.task_done()
async def main():
pipeline_in = 'hello world'
print(f'converting to upper case: {pipeline_in}')
queue_AB = asyncio.Queue()
queue_BC = asyncio.Queue()
stepA = asyncio.create_task(do_stepA(queue_AB, pipeline_in))
stepB = asyncio.create_task(do_stepB(queue_AB, queue_BC))
stepC = asyncio.create_task(do_stepC(queue_BC))
await stepA
print('step A done')
await queue_AB.join()
print('queue A - B done')
stepB.cancel() # no more date is going to show up at B
await queue_BC.join()
print('queue B - C done')
stepC.cancel() # no more date is going to show up at C
print(f'main complete, result: {result}')
asyncio.run(main())
print('program complete')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment