Skip to content

Instantly share code, notes, and snippets.

@onjin
Created March 15, 2024 08:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save onjin/fcfe93df71803a6d4741b44ace0b6252 to your computer and use it in GitHub Desktop.
Save onjin/fcfe93df71803a6d4741b44ace0b6252 to your computer and use it in GitHub Desktop.
example creator/producers/consumers async patter - reusable
import asyncio
import httpx
from typing import Callable, List, Any
from rich.console import Console
console = Console()
async def producer(
callback: Callable[[int, Any], Any],
input_queue: asyncio.Queue[str | None],
output_queue: asyncio.Queue[Any],
producer_id: int,
) -> None:
while True:
line = await input_queue.get()
if line is None:
for _ in range(num_consumers):
await output_queue.put(None)
break
await output_queue.put(await callback(producer_id, line))
input_queue.task_done()
async def consumer(
callback: Callable[[int, Any], Any],
output_queue: asyncio.Queue[Any],
consumer_id: int,
) -> None:
while True:
data = await output_queue.get()
if data is None:
output_queue.task_done()
break
await callback(consumer_id, data)
output_queue.task_done()
async def run(num_producers: int, num_consumers: int, endpoints_file: str) -> None:
input_queue: asyncio.Queue[str | None] = asyncio.Queue()
output_queue: asyncio.Queue[Any] = asyncio.Queue()
async def input_provider(
input_queue: asyncio.Queue[str | None], endpoints_file: str, num_producers: int
) -> None:
with open(endpoints_file, "r") as file:
for line in file:
console.print(f"[green]Providing: {line.strip()}")
await input_queue.put(line.strip())
for _ in range(num_producers):
await input_queue.put(None)
async def producer_callback(producer_id: int, data: str):
console.print(f"[blue]Producer: {producer_id} got line {data}")
async with httpx.AsyncClient() as client:
result = await client.get(data)
return result.headers["date"]
async def consumer_callback(consumer_id: int, data: str):
console.print(f"[white]Consumer: {consumer_id} got line {data}")
tasks: List[asyncio.Task[Any]] = [
asyncio.create_task(input_provider(input_queue, endpoints_file, num_producers))
]
tasks.extend(
[
asyncio.create_task(
producer(producer_callback, input_queue, output_queue, pid)
)
for pid in range(num_producers)
]
)
tasks.extend(
[
asyncio.create_task(consumer(consumer_callback, output_queue, cid))
for cid in range(num_consumers)
]
)
await asyncio.gather(*tasks)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--number_of_producers", type=int, default=5)
parser.add_argument("-c", "--number_of_consumers", type=int, default=10)
parser.add_argument("-f", "--endpoints_file", type=str, required=True)
args = parser.parse_args()
num_producers: int = args.number_of_producers
num_consumers: int = args.number_of_consumers
endpoints_file: str = args.endpoints_file
asyncio.run(run(num_producers, num_consumers, endpoints_file))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment