Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Asyncio pipeline performing concurrent network request
"""
Asyncio pipeline example program that retrieves sink speed for first 100 meetbouten.
First fetches ids using list endpoint and calls detail endpoint for each id.
p.s. A meetbout is a physical screw on the outside of a building which is used to determine the "sink" rate of the
structure.
"""
import asyncio
import json
import random
import time
from dataclasses import dataclass
import httpx
client = httpx.AsyncClient()
@dataclass()
class DataAB:
meetbout_id: int
@dataclass()
class DataBC:
meetbout_id: int
zakkingsnelheid: float
result = []
async def sleep_about(t: float):
sleep_s = t + 0.5 * random.random()
await asyncio.sleep(sleep_s)
async def do_stepA(queue_out, N):
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/?page_size={min(100, N)}')
data = json.loads(r.content)
meetbouten = data.get('results')
print(f'A, #meetbouten: {len(meetbouten)}')
ids = [x.get('id') for x in meetbouten[:N]]
print(f'A, meetbouten ids: {ids}')
for id in ids:
await queue_out.put(DataAB(id))
async def do_stepB(queue_in, queue_out, task_idx):
while True:
data: DataAB = await queue_in.get()
# perform actual step
id = data.meetbout_id
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/{id}/')
data = json.loads(r.content)
zakkingsnelheid = data.get("zakkingssnelheid")
print(f'B {task_idx}, meetbout {id} sink speed {zakkingsnelheid}')
await queue_out.put(DataBC(id, zakkingsnelheid))
queue_in.task_done()
async def do_stepC(queue_in):
"""
Accumulates results, for demonstration purpose only
"""
global result
while True:
data: DataBC = await queue_in.get()
# perform actual step
result.append({
'id': data.meetbout_id,
'sink_speed': data.zakkingsnelheid
})
queue_in.task_done()
async def main():
t0 = time.time()
N_MEETBOUTEN = 100
N_STAGE_B_TASKS = 2
queue_AB = asyncio.Queue()
queue_BC = asyncio.Queue()
stepA = asyncio.create_task(do_stepA(queue_AB, N_MEETBOUTEN))
stepsB = [asyncio.create_task(do_stepB(queue_AB, queue_BC, task_idx)) for task_idx in range(N_STAGE_B_TASKS)]
stepC = asyncio.create_task(do_stepC(queue_BC))
await stepA
print('step A done')
await queue_AB.join()
print('queue A - B done')
for step in stepsB:
step.cancel() # no more date is going to show up at B
await queue_BC.join()
print('queue B - C done')
stepC.cancel() # no more date is going to show up at C
print(f'main complete, result: {result[:3]}...')
difference = time.time() - t0
print(f'total script time: {round(difference, 3)}s')
asyncio.run(main())
print('program complete')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.