Skip to content

Instantly share code, notes, and snippets.

@mauroquinteros
Last active June 28, 2023 01:00
Show Gist options
  • Save mauroquinteros/aa852ff1a9e67b55f47f9e15ba0a01f4 to your computer and use it in GitHub Desktop.
Save mauroquinteros/aa852ff1a9e67b55f47f9e15ba0a01f4 to your computer and use it in GitHub Desktop.
Async
from typing import Callable, Coroutine, List
import aiohttp
import asyncio
async def http_get(session: aiohttp.ClientSession, url: str) -> Coroutine:
"""Execute an GET http call async """
async with session.get(url) as response:
resp = await response.json()
return resp
async def http_post(session: aiohttp.ClientSession, url: str) -> Coroutine:
"""Execute an POST http call async """
async with session.post(url) as response:
resp = await response.json()
return resp
async def fetch_all(urls: List, inner: Callable):
"""Gather many HTTP call made async """
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
inner(
session,
url
)
)
responses = await asyncio.gather(*tasks, return_exceptions=True)
return responses
def run():
comments = [f"https://jsonplaceholder.typicode.com/comments/{id_}" for id_ in range(1,500)]
responses = asyncio.get_event_loop().run_until_complete(fetch_all(comments, http_get))
print(responses)
run()
from dataclasses import dataclass
import threading
import queue
from typing import Dict, List
import requests
@dataclass
class ThreadRequests(object):
urls: queue.Queue = queue.Queue()
infos: queue.Queue = queue.Queue()
def __init__(
self,
urls: List[str],
http_method: str,
nb_threads: int = 2,
) -> None:
"""Put all urls to the queue url """
self.nb_threads = nb_threads
self.http_method = http_method
self.workers = {"GET": self.worker_get, "POST": self.worker_post}
for url in urls:
self.urls.put(url)
@property
def responses(self) -> List[Dict]:
return list(self.infos.queue)
def run(self) -> None:
""" Run all workers"""
for i in range(0, self.nb_threads):
threading.Thread(target=self.workers[self.http_method], daemon=True).start()
self.urls.join()
def worker_get(self) -> None:
"""Pull a url from the queue and make a get request to endpoint"""
while not self.urls.empty():
url = self.urls.get()
resp = requests.get(url)
self.infos.put(resp.json())
self.urls.task_done()
def worker_post(self) -> None:
"""Pull a url from the queue and make a post request to endpoint"""
while not self.urls.empty():
url = self.urls.get()
resp = requests.post(url)
self.infos.put(resp.json())
self.urls.task_done()
comments = [
f"https://jsonplaceholder.typicode.com/comments/{id_}" for id_ in range(1, 500)
]
client = ThreadRequests(comments, "GET", nb_threads=5)
client.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment