Skip to content

Instantly share code, notes, and snippets.

@rednafi
Last active April 18, 2021 16:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rednafi/ee1622dcb1db8cafe58530d52383c49d to your computer and use it in GitHub Desktop.
Save rednafi/ee1622dcb1db8cafe58530d52383c49d to your computer and use it in GitHub Desktop.
Python Asyncio MRE scripts.
import asyncio
import httpx
MAX_CONSUMERS = 50
async def make_request(url):
async with httpx.AsyncClient(http2=True) as client:
response = await client.get(url)
print(response.status_code)
return response.status_code
async def safe_make_request(url, semaphore=None):
if semaphore:
async with semaphore:
await make_request(url)
else:
await make_request(url)
async def main():
token = "c1u0d7qad3ifani3q2rg"
url = f"https://finnhub.io/api/v1/forex/rates?base=USD&token={token}"
semaphore = asyncio.Semaphore(5)
tasks = [
asyncio.create_task(safe_make_request(url, semaphore))
for _ in range(MAX_CONSUMERS)
]
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.run(main())
"""
Producer-consumer pattern using Python's asyncio—
where both of the entities are long running.
Producer
--------
Here, the producer generates a sequeunce of numbers 0,1,0,2,0,0,3,0,0,4 indefinitely.
However, it only puts non-zero values into the queue.
Consumer
--------
The consumer only runs when a non-zero value appears on the queue and lets the
producer run again. This juggling between producer and consumer goes on indefinitely.
"""
import asyncio
import itertools
async def producer(queue: asyncio.Queue, event: asyncio.Event) -> None:
for i in itertools.cycle([0, 1, 0, 2, 0, 0, 3, 0, 0, 4]):
print("Running producer...")
if i:
await queue.put(i)
event.set()
await asyncio.sleep(1)
async def consumer(queue: asyncio.Queue, event: asyncio.Event) -> None:
await event.wait()
while True:
print("Running consumer...")
res = await queue.get()
print(res)
queue.task_done()
async def main() -> None:
queue = asyncio.Queue()
event = asyncio.Event()
_ = asyncio.create_task(producer(queue, event))
consumer_tasks = [consumer(queue, event) for _ in range(3)]
await asyncio.gather(*consumer_tasks, return_exceptions=True)
# This implicitly run the producer
await queue.join()
# Cancel tasks
for task in consumer_tasks:
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment