Asyncio pipeline performing concurrent network request
""" | |
Asyncio pipeline example program that retrieves sink speed for first 100 meetbouten. | |
First fetches ids using list endpoint and calls detail endpoint for each id. | |
p.s. A meetbout is a physical screw on the outside of a building which is used to determine the "sink" rate of the | |
structure. | |
""" | |
import asyncio | |
import json | |
import random | |
import time | |
from dataclasses import dataclass | |
import httpx | |
client = httpx.AsyncClient() | |
@dataclass() | |
class DataAB: | |
meetbout_id: int | |
@dataclass() | |
class DataBC: | |
meetbout_id: int | |
zakkingsnelheid: float | |
result = [] | |
async def sleep_about(t: float): | |
sleep_s = t + 0.5 * random.random() | |
await asyncio.sleep(sleep_s) | |
async def do_stepA(queue_out, N): | |
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/?page_size={min(100, N)}') | |
data = json.loads(r.content) | |
meetbouten = data.get('results') | |
print(f'A, #meetbouten: {len(meetbouten)}') | |
ids = [x.get('id') for x in meetbouten[:N]] | |
print(f'A, meetbouten ids: {ids}') | |
for id in ids: | |
await queue_out.put(DataAB(id)) | |
async def do_stepB(queue_in, queue_out, task_idx): | |
while True: | |
data: DataAB = await queue_in.get() | |
# perform actual step | |
id = data.meetbout_id | |
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/{id}/') | |
data = json.loads(r.content) | |
zakkingsnelheid = data.get("zakkingssnelheid") | |
print(f'B {task_idx}, meetbout {id} sink speed {zakkingsnelheid}') | |
await queue_out.put(DataBC(id, zakkingsnelheid)) | |
queue_in.task_done() | |
async def do_stepC(queue_in): | |
""" | |
Accumulates results, for demonstration purpose only | |
""" | |
global result | |
while True: | |
data: DataBC = await queue_in.get() | |
# perform actual step | |
result.append({ | |
'id': data.meetbout_id, | |
'sink_speed': data.zakkingsnelheid | |
}) | |
queue_in.task_done() | |
async def main(): | |
t0 = time.time() | |
N_MEETBOUTEN = 100 | |
N_STAGE_B_TASKS = 2 | |
queue_AB = asyncio.Queue() | |
queue_BC = asyncio.Queue() | |
stepA = asyncio.create_task(do_stepA(queue_AB, N_MEETBOUTEN)) | |
stepsB = [asyncio.create_task(do_stepB(queue_AB, queue_BC, task_idx)) for task_idx in range(N_STAGE_B_TASKS)] | |
stepC = asyncio.create_task(do_stepC(queue_BC)) | |
await stepA | |
print('step A done') | |
await queue_AB.join() | |
print('queue A - B done') | |
for step in stepsB: | |
step.cancel() # no more date is going to show up at B | |
await queue_BC.join() | |
print('queue B - C done') | |
stepC.cancel() # no more date is going to show up at C | |
print(f'main complete, result: {result[:3]}...') | |
difference = time.time() - t0 | |
print(f'total script time: {round(difference, 3)}s') | |
asyncio.run(main()) | |
print('program complete') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment