Skip to content

Instantly share code, notes, and snippets.

@dedsm
Last active October 12, 2021 03:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dedsm/a35c4c68160559e0c980c8dc6db25cfd to your computer and use it in GitHub Desktop.
Save dedsm/a35c4c68160559e0c980c8dc6db25cfd to your computer and use it in GitHub Desktop.
Fastapi cache setup
from .initialize import setup_cache
from .connections import connection
@app.on_event('startup')
async def setup_connections():
await setup_cache()
@app.middleware("http")
async def connections_middleware(request, call_next):
request.state.cache = connection.cache
return await call_next(request)
import itertools
import ujson
from .config import settings
class Cache:
def __init__(self, redis):
self.redis = redis
async def get(self, key, load=True, *args, **kwargs):
if not settings.CACHE_ENABLED:
return None
val = await self.redis.get(key)
if val and load:
return ujson.loads(val)
elif val:
return val
return None
async def set(self, key, val, dump=True, *args, **kwargs):
if dump:
val = ujson.dumps(val)
return await self.redis.set(key, val, *args, **kwargs)
async def hmset_dict(self, field, dump=True, **kwargs):
if dump:
dct = {k: ujson.dumps(v) for k, v in kwargs.items()}
else:
dct = kwargs
return await self.redis.hmset_dict(field, dct)
async def hmget(self, *args, load=True, **kwargs):
values = await self.redis.hmget(*args, **kwargs)
if load:
res = []
for elem in values:
if elem is None:
res.append(elem)
else:
res.append(ujson.loads(elem))
else:
res = values
return res
async def mget(self, *args, load=True, **kwargs):
values = await self.redis.mget(*args, **kwargs)
if load:
res = []
for elem in values:
if elem is None:
res.append(elem)
else:
res.append(ujson.loads(elem))
else:
res = values
return res
async def mset(self, pairs, dump=True, **kwargs):
if dump:
args = list(
itertools.chain.from_iterable(
((k, ujson.dumps(v)) for k, v in pairs.items())))
else:
args = list(itertools.chain.from_iterable(pairs.items()))
return await self.redis.mset(*args, **kwargs)
async def hget(self, key, field, load=True, **kwargs):
val = await self.redis.hget(key, field, **kwargs)
if load:
val = ujson.loads(val)
return val
async def hset(self, key, field, value, dump=True):
if dump:
val = ujson.dumps(value)
else:
val = value
return await self.redis.hset(key, field, val)
async def hdel(self, *args, **kwargs):
return await self.redis.hdel(*args, **kwargs)
async def delete(self, *args, **kwargs):
return await self.redis.delete(*args, **kwargs)
from typing import Optional
from .cache import Cache
class Connections:
cache: Optional[Cache] = None
connection = Connections()
import aioredis
from .config import settings
from .cache import Cache
async def setup_cache():
sentinel_client = await aioredis.create_sentinel(
[(settings.REDIS_SENTINEL_URL, settings.REDIS_SENTINEL_PORT)],
password=settings.REDIS_PASSWORD)
redis = sentinel_client.master_for(settings.REDIS_SENTINEL_SET)
connection.cache = Cache(redis)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment