Skip to content

Instantly share code, notes, and snippets.

@sksitou
Last active June 28, 2019 10:42
Show Gist options
  • Save sksitou/d818023ef406704f9ae1c507f13fe694 to your computer and use it in GitHub Desktop.
Save sksitou/d818023ef406704f9ae1c507f13fe694 to your computer and use it in GitHub Desktop.
LRU cache with Redis
# https://github.com/python/cpython/blob/master/Lib/functools.py
from collections import namedtuple
from functools import partial
import pickle
from my_redis import Client
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__qualname__', '__doc__',
'__annotations__')
WRAPPER_UPDATES = ('__dict__',)
def update_wrapper(wrapper,
wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
"""Update a wrapper function to look like the wrapped function
wrapper is the function to be updated
wrapped is the original function
assigned is a tuple naming the attributes assigned directly
from the wrapped function to the wrapper function (defaults to
functools.WRAPPER_ASSIGNMENTS)
updated is a tuple naming the attributes of the wrapper that
are updated with the corresponding attribute from the wrapped
function (defaults to functools.WRAPPER_UPDATES)
"""
for attr in assigned:
try:
value = getattr(wrapped, attr)
except AttributeError:
pass
else:
setattr(wrapper, attr, value)
for attr in updated:
getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
# Issue #17482: set __wrapped__ last so we don't inadvertently copy it
# from the wrapped function when updating __dict__
wrapper.__wrapped__ = wrapped
# Return the wrapper so this can be used as a decorator via partial()
return wrapper
def wraps(wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
"""Decorator factory to apply update_wrapper() to a wrapper function
Returns a decorator that invokes update_wrapper() with the decorated
function as the wrapper argument and the arguments to wraps() as the
remaining arguments. Default arguments are as for update_wrapper().
This is a convenience function to simplify applying partial() to
update_wrapper().
"""
return partial(update_wrapper, wrapped=wrapped,
assigned=assigned, updated=updated)
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
class _HashedSeq(list):
""" This class guarantees that hash() will be called no more than once
per element. This is important because the lru_cache() will hash
the key multiple times on a cache miss.
"""
__slots__ = 'hashvalue'
def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)
def __hash__(self):
return self.hashvalue
def _make_key(args, kwds, typed,
kwd_mark = (object(),),
fasttypes = {int, str},
tuple=tuple, type=type, len=len):
"""Make a cache key from optionally typed positional and keyword arguments
The key is constructed in a way that is flat as possible rather than
as a nested structure that would take more memory.
If there is only a single argument and its data type is known to cache
its hash value, then that argument is returned without a wrapper. This
saves space and improves lookup speed.
"""
# All of code below relies on kwds preserving the order input by the user.
# Formerly, we sorted() the kwds before looping. The new way is *much*
# faster; however, it means that f(x=1, y=2) will now be treated as a
# distinct call from f(y=2, x=1) which will be cached separately.
key = args
if kwds:
key += kwd_mark
for item in kwds.items():
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for v in kwds.values())
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return _HashedSeq(key)
def lru_cache_redis(maxsize=128, typed=False, redis_conn=None):
"""Least-recently-used cache decorator.
If *maxsize* is set to None, the LRU features are disabled and the cache
can grow without bound.
If *typed* is True, arguments of different types will be cached separately.
For example, f(3.0) and f(3) will be treated as distinct calls with
distinct results.
Arguments to the cached function must be hashable.
View the cache statistics named tuple (hits, misses, maxsize, currsize)
with f.cache_info(). Clear the cache and statistics with f.cache_clear().
Access the underlying function with f.__wrapped__.
See: http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
"""
# Users should only access the lru_cache through its public API:
# cache_info, cache_clear, and f.__wrapped__
# The internals of the lru_cache are encapsulated for thread safety and
# to allow the implementation to change (including a possible C version).
if isinstance(maxsize, int):
# Negative maxsize is treated as 0
if maxsize < 0:
maxsize = 0
elif callable(maxsize) and isinstance(typed, bool):
# The user_function was passed in directly via the maxsize argument
user_function, maxsize = maxsize, 128
wrapper = _lru_cache_wrapper2(user_function, maxsize, typed, _CacheInfo, redis_conn)
return update_wrapper(wrapper, user_function)
elif maxsize is not None:
raise TypeError(
'Expected first argument to be an integer, a callable, or None')
def decorating_function(user_function):
wrapper = _lru_cache_wrapper2(user_function, maxsize, typed, _CacheInfo, redis_conn)
return update_wrapper(wrapper, user_function)
return decorating_function
def _lru_cache_wrapper_redis(user_function, maxsize, typed, _CacheInfo, redis_conn):
make_key = _make_key # build a key from the function arguments
cache = redis_conn.conn
hash_key = "lru_hash:" + user_function.__name__
list_key = "lru_list:" + user_function.__name__
if maxsize == 0:
def wrapper(*args, **kwds):
# No caching -- just a statistics update
result = user_function(*args, **kwds)
return result
elif maxsize is None:
def wrapper(*args, **kwds):
# Simple caching without ordering or size limit
key = make_key(args, kwds, typed)
result = cache.hget(hash_key, key)
print('result', result)
if result:
return pickle.loads(result)
result = user_function(*args, **kwds)
tmp = pickle.dumps(result)
cache.hset(hash_key, key, tmp)
return result
else:
def wrapper(*args, **kwds):
key = make_key(args, kwds, typed)
def first_update():
link = cache.hget(hash_key, key)
if link:
cache.lrem(list_key, 1, link)
cache.lpush(list_key, link)
result = pickle.loads(link)
return result
redis_conn.dlm.exec_func(user_function.__name__, first_update)
result = user_function(*args, **kwds)
def second_update():
if key in cache:
pass
elif cache.llen(list_key) >= maxsize:
cache.hdel(hash_key, key)
cache.rpop(list_key)
else:
tmp = pickle.dumps(result)
cache.hset(hash_key, key, tmp)
cache.lpush(list_key, cache.hget(hash_key, key))
redis_conn.dlm.exec_func(user_function.__name__, second_update)
return result
def cache_info():
"""Report cache statistics"""
def sub():
return cache.llen(list_key)
return redis_conn.dlm.exec_func(user_function.__name__, sub)
def cache_clear():
"""Clear the cache and cache statistics"""
def sub():
cache.ltrim(list_key, 0 ,0)
cache.hdel(hash_key, "*")
redis_conn.dlm.exec_func(user_function.__name__, sub)
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return wrapper
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment