Skip to content

Instantly share code, notes, and snippets.

@horaceg
Last active June 16, 2023 19:47
Show Gist options
  • Save horaceg/2afe33aff9ba28c36757f52f13edd298 to your computer and use it in GitHub Desktop.
Save horaceg/2afe33aff9ba28c36757f52f13edd298 to your computer and use it in GitHub Desktop.
import asyncio
import functools
import random
import time
from concurrent.futures import ThreadPoolExecutor as Pool
import aiohttp
import requests
def timed(N, url, fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start = time.time()
res = fn(*args, **kwargs)
stop = time.time()
duration = stop - start
print(f"{N / duration:.2f} reqs / sec | {N} reqs | {url} | {fn.__name__}")
return res
return wrapper
def get(url):
resp = requests.get(url)
assert resp.status_code == 200
return resp.json()
async def aget(session, url):
async with session.get(url) as response:
assert response.status == 200
json = await response.json()
return json
async def gather_limit(n_workers, *tasks):
semaphore = asyncio.Semaphore(n_workers)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
async def aget_all(url, n, n_workers=None):
limit = n_workers or n
async with aiohttp.ClientSession() as session:
result = await gather_limit(limit, *[aget(session, url) for _ in range(n)])
return result
def sync_get_all(url, n):
l = [get(url) for _ in range(n)]
return l
def thread_pool(url, n, limit=None):
limit_ = limit or n
with Pool(max_workers=limit_) as pool:
result = pool.map(get, [url] * n)
return result
def async_main(url, n):
return asyncio.run(aget_all(url, n))
def run_bench(n, funcs, urls):
for url in urls:
for func in funcs:
timed(n, url, func)(url, n)
if __name__ == "__main__":
urls = ["http://127.0.0.1:8000/wait", "http://127.0.0.1:8000/asyncwait"]
funcs = [sync_get_all, thread_pool, async_main]
run_bench(100, funcs, urls)
run_bench(1000, [thread_pool, async_main], urls)
import asyncio
import random
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
return response
@app.get("/wait")
def wait():
duration = 0.05
time.sleep(duration)
return {"duration": duration}
@app.get("/asyncwait")
async def asyncwait():
duration = 0.05
await asyncio.sleep(duration)
return {"duration": duration}
def fibo(n):
if n < 2:
return 1
else:
return fibo(n - 1) + fibo(n - 2)
async def afibo(n):
if n < 2:
return 1
else:
fib1 = await afibo(n - 1)
fib2 = await afibo(n - 2)
return fib1 + fib2
@app.get("/fib/{n}")
def fib(n: int):
return {"fib": fibo(n)}
@app.get("/asyncfib/{n}")
async def asyncfib(n: int):
res = await afibo(n)
return {"fib": res}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment