Asyncio pipeline performing concurrent network request
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Asyncio pipeline example program that retrieves sink speed for first 100 meetbouten. | |
First fetches ids using list endpoint and calls detail endpoint for each id. | |
p.s. A meetbout is a physical screw on the outside of a building which is used to determine the "sink" rate of the | |
structure. | |
""" | |
import asyncio | |
import json | |
import random | |
import time | |
from dataclasses import dataclass | |
import httpx | |
client = httpx.AsyncClient() | |
@dataclass() | |
class DataAB: | |
meetbout_id: int | |
@dataclass() | |
class DataBC: | |
meetbout_id: int | |
zakkingsnelheid: float | |
result = [] | |
async def sleep_about(t: float): | |
sleep_s = t + 0.5 * random.random() | |
await asyncio.sleep(sleep_s) | |
async def do_stepA(queue_out, N): | |
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/?page_size={min(100, N)}') | |
data = json.loads(r.content) | |
meetbouten = data.get('results') | |
print(f'A, #meetbouten: {len(meetbouten)}') | |
ids = [x.get('id') for x in meetbouten[:N]] | |
print(f'A, meetbouten ids: {ids}') | |
for id in ids: | |
await queue_out.put(DataAB(id)) | |
async def do_stepB(queue_in, queue_out, task_idx): | |
while True: | |
data: DataAB = await queue_in.get() | |
# perform actual step | |
id = data.meetbout_id | |
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/{id}/') | |
data = json.loads(r.content) | |
zakkingsnelheid = data.get("zakkingssnelheid") | |
print(f'B {task_idx}, meetbout {id} sink speed {zakkingsnelheid}') | |
await queue_out.put(DataBC(id, zakkingsnelheid)) | |
queue_in.task_done() | |
async def do_stepC(queue_in): | |
""" | |
Accumulates results, for demonstration purpose only | |
""" | |
global result | |
while True: | |
data: DataBC = await queue_in.get() | |
# perform actual step | |
result.append({ | |
'id': data.meetbout_id, | |
'sink_speed': data.zakkingsnelheid | |
}) | |
queue_in.task_done() | |
async def main(): | |
t0 = time.time() | |
N_MEETBOUTEN = 100 | |
N_STAGE_B_TASKS = 2 | |
queue_AB = asyncio.Queue() | |
queue_BC = asyncio.Queue() | |
stepA = asyncio.create_task(do_stepA(queue_AB, N_MEETBOUTEN)) | |
stepsB = [asyncio.create_task(do_stepB(queue_AB, queue_BC, task_idx)) for task_idx in range(N_STAGE_B_TASKS)] | |
stepC = asyncio.create_task(do_stepC(queue_BC)) | |
await stepA | |
print('step A done') | |
await queue_AB.join() | |
print('queue A - B done') | |
for step in stepsB: | |
step.cancel() # no more date is going to show up at B | |
await queue_BC.join() | |
print('queue B - C done') | |
stepC.cancel() # no more date is going to show up at C | |
print(f'main complete, result: {result[:3]}...') | |
difference = time.time() - t0 | |
print(f'total script time: {round(difference, 3)}s') | |
asyncio.run(main()) | |
print('program complete') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment