Skip to content

Instantly share code, notes, and snippets.

@antonagestam
Created July 14, 2019 20:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save antonagestam/f0182d9264d2f736f528bf0161729d2e to your computer and use it in GitHub Desktop.
Save antonagestam/f0182d9264d2f736f528bf0161729d2e to your computer and use it in GitHub Desktop.
from asyncio import Lock
from collections import defaultdict
from functools import wraps
from typing import Any
from typing import Awaitable
from typing import DefaultDict
from typing import Dict
from typing import TypeVar
from typing_extensions import Protocol
T = TypeVar("T", covariant=True, bound=Any)
class Resolver(Protocol[T]):
def __call__(self) -> Awaitable[T]:
...
_locks: DefaultDict[Resolver, Lock] = defaultdict(Lock)
_resolutions: Dict[Resolver, Any] = {}
async def _resolve(fn: Resolver[T]) -> T:
global _resolutions
if fn not in _resolutions:
async with _locks[fn]:
# It's important that this check happens with the lock acquired. The
# outside check is an optimization for calls that happen after the
# function is resolved, to allow non-synchronized access then.
if fn not in _resolutions:
_resolutions[fn] = await fn()
return _resolutions[fn]
def resolve_once(fn: Resolver[T]) -> Resolver[T]:
@wraps(fn)
async def wrapper() -> T:
return await _resolve(fn)
return wrapper
if __name__ == "__main__":
import datetime
import requests
import asyncio
@resolve_once
async def fetch_current_time() -> datetime.datetime:
print("hittin dat API")
response = requests.get("http://worldtimeapi.org/api/ip")
assert response.status_code == 200
payload = response.json()
return datetime.datetime.fromisoformat(payload["utc_datetime"])
async def main():
print(
*await asyncio.gather(
fetch_current_time(),
fetch_current_time(),
fetch_current_time(),
fetch_current_time(),
fetch_current_time(),
)
)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment