-
-
Save Morreski/c1d08a3afa4040815eafd3891e16b945 to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta | |
import functools | |
def timed_cache(**timedelta_kwargs): | |
def _wrapper(f): | |
update_delta = timedelta(**timedelta_kwargs) | |
next_update = datetime.utcnow() + update_delta | |
# Apply @lru_cache to f with no cache size limit | |
f = functools.lru_cache(None)(f) | |
@functools.wraps(f) | |
def _wrapped(*args, **kwargs): | |
nonlocal next_update | |
now = datetime.utcnow() | |
if now >= next_update: | |
f.cache_clear() | |
next_update = now + update_delta | |
return f(*args, **kwargs) | |
return _wrapped | |
return _wrapper |
adrianlzt
commented
Dec 22, 2017
Great pìece of code. Thanks!!
Thanks for your feedback ! And for mentionning the imports. :)
So simple yet so useful! Thanks @Morreski! 👍
Well done 👏
Thanks @Morreski! Take a look at this modification to support passing arguments to the underlying lru_cache
method: https://gist.github.com/jmdacruz/764bcaa092eefc369a8bfb90c5fe3227
good one
Add support lru_cache
of maxsize
and typed
.
from datetime import datetime, timedelta
import functools
def timed_cache(**timed_cache_kwargs):
def _wrapper(f):
maxsize = timed_cache_kwargs.pop('maxsize', 128)
typed = timed_cache_kwargs.pop('typed', False)
update_delta = timedelta(**timed_cache_kwargs)
next_update = datetime.utcnow() - update_delta
f = functools.lru_cache(maxsize=maxsize, typed=False)(f)
@functools.wraps(f)
def _wrapped(*args, **kwargs):
nonlocal next_update
now = datetime.utcnow()
if now >= next_update:
f.cache_clear()
next_update = now + update_delta
return f(*args, **kwargs)
return _wrapped
return _wrapper
I think it should be next_update = datetime.utcnow() + update_delta
but in fact it does not change the correctness of the solution since if will force a flush on the first call. It's just not needed and if copy pasted to another context it could be wrong.
from datetime import datetime, timedelta
import functools
def timed_cache(**timedelta_kwargs):
def _wrapper(f):
update_delta = timedelta(**timedelta_kwargs)
next_update = datetime.utcnow() + update_delta
# Apply @lru_cache to f with no cache size limit
f = functools.lru_cache(None)(f)
@functools.wraps(f)
def _wrapped(*args, **kwargs):
nonlocal next_update
now = datetime.utcnow()
if now >= next_update:
f.cache_clear()
next_update = now + update_delta
return f(*args, **kwargs)
return _wrapped
return _wrapper
f = functools.lru_cache(maxsize=maxsize, typed=False)(f)
There should be typed=typed
instead of typed=False
In general, nice piece of code but what's the point to clear whole cache after timeout? To me, timeout should be applied to individual results.
f = functools.lru_cache(maxsize=maxsize, typed=False)(f)
There should be
typed=typed
instead oftyped=False
In general, nice piece of code but what's the point to clear whole cache after timeout? To me, timeout should be applied to individual results.
I agree, I was hoping for a recipe for a per-element expiration, this example is far too heavy-handed, as it clears the ENTIRE cache if any individual element is outdated.
@Spaider @linclelinkpart5
Here is a version that supports per-element expiration.
Since the official "lru_cache" doesn't offer api to remove specific element from cache, I have to re-implement it. Most of the code are just from the original "lru_cache", except the parts for expiration and the class "Node" to implement linked list. (The official version implements
linked list with array)
Thank you for this! I used it in a project where we have 100% test coverage so I wrote this simple test for it.
Thought it could be useful for others as well.
import unittest
class Testing(unittest.TestCase):
def test_timed_cache(self):
"""Test the timed_cache decorator."""
from python_file import timed_cache
import logging
import time
cache_logger = logging.getLogger("foo_log")
@timed_cache(seconds=1)
def cache_testing_function(num1, num2):
cache_logger.info("Not cached yet.")
return num1 + num2
with self.assertLogs("foo_log", level="INFO") as cache_log:
result1 = cache_testing_function(2, 3)
self.assertEqual(cache_log.output[0], "INFO:foo_log:Not cached yet.")
assert result1 == 5
result2 = cache_testing_function(2, 3)
assert len(cache_log.output) == 1
assert result2 == 5
time.sleep(1)
result3 = cache_testing_function(2, 3)
self.assertEqual(cache_log.output[1], "INFO:foo_log:Not cached yet.")
assert result3 == 5
I think it should be
next_update = datetime.utcnow() + update_delta
but in fact it does not change the correctness of the solution since if will force a flush on the first call. It's just not needed and if copy pasted to another context it could be wrong.from datetime import datetime, timedelta import functools def timed_cache(**timedelta_kwargs): def _wrapper(f): update_delta = timedelta(**timedelta_kwargs) next_update = datetime.utcnow() + update_delta # Apply @lru_cache to f with no cache size limit f = functools.lru_cache(None)(f) @functools.wraps(f) def _wrapped(*args, **kwargs): nonlocal next_update now = datetime.utcnow() if now >= next_update: f.cache_clear() next_update = now + update_delta return f(*args, **kwargs) return _wrapped return _wrapper
Hi ! You're 100% right. I updated the gist with your fixed version. Thanks !
Thanks for this! Very helpful.
I used this function in one of my projects but modified it a little bit before using it.
def cache(seconds: int, maxsize: int = 128, typed: bool = False):
def wrapper_cache(func):
func = functools.lru_cache(maxsize=maxsize, typed=typed)(func)
func.delta = timedelta(seconds=seconds)
func.expiration = datetime.utcnow() + func.delta
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.utcnow() >= func.expiration:
func.cache_clear()
func.expiration = datetime.utcnow() + func.delta
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
Here are some notes about this version:
-
The
@cache
decorator simply expects the number of seconds instead of the full list of arguments expected bytimedelta
. This avoids leakingtimedelta
's interface outside of the implementation of@cache
. Having the number of seconds should be flexible enough to invalidate the cache at any interval. -
maxsize
andtyped
can now be explicitly declared as part of the arguments expected by@cache
. -
By adding the
delta
andexpiration
variables to thefunc
we don't have to use thenonlocal
variables, which makes for more readable and compact code.
Also, here is a pytest test case:
def test_cache():
count = 0
@cache(seconds=1)
def test(arg1):
nonlocal count
count += 1
return count
assert test(1) == 1, "Function should be called the first time we invoke it"
assert test(1) == 1, "Function should not be called because it is already cached"
# Let's now wait for the cache to expire
time.sleep(1)
assert test(1) == 2, "Function should be called because the cache already expired"
Thanks for this! Very helpful.
I used this function in one of my projects but modified it a little bit before using it.
def cache(seconds: int, maxsize: int = 128, typed: bool = False): def wrapper_cache(func): func = functools.lru_cache(maxsize=maxsize, typed=typed)(func) func.delta = timedelta(seconds=seconds) func.expiration = datetime.utcnow() + func.delta @functools.wraps(func) def wrapped_func(*args, **kwargs): if datetime.utcnow() >= func.expiration: func.cache_clear() func.expiration = datetime.utcnow() + func.delta return func(*args, **kwargs) return wrapped_func return wrapper_cacheHere are some notes about this version:
- The
@cache
decorator simply expects the number of seconds instead of the full list of arguments expected bytimedelta
. This avoids leakingtimedelta
's interface outside of the implementation of@cache
. Having the number of seconds should be flexible enough to invalidate the cache at any interval.maxsize
andtyped
can now be explicitly declared as part of the arguments expected by@cache
.- By adding the
delta
andexpiration
variables to thefunc
we don't have to use thenonlocal
variables, which makes for more readable and compact code.Also, here is a pytest test case:
def test_cache(): count = 0 @cache(seconds=1) def test(arg1): nonlocal count count += 1 return count assert test(1) == 1, "Function should be called the first time we invoke it" assert test(1) == 1, "Function should not be called because it is already cached" # Let's now wait for the cache to expire time.sleep(1) assert test(1) == 2, "Function should be called because the cache already expired"
Thanks your share, it's very good!
I add some test and info about test_cache
for some people's doubts.
def test_cache():
count = 0
count2 = 0
@cache(seconds=1)
def test(arg1):
nonlocal count
count += 1
return count
@cache(seconds=10)
def test_another(arg2):
nonlocal count2
count2 += 1
return count2
assert test(1) == 1, "Function test with arg 1 should be called the first time we invoke it"
assert test(1) == 1, "Function test with arg 1 should not be called because it is already cached"
assert test(-1) == 2, "Function test with arg -1 should be called the first time we invoke it"
assert test(-1) == 2, "Function test with arg -1 should not be called because it is already cached"
assert test_another(1) == 1, "Function test_another with arg 1 should be called the first time we invoke it"
assert test_another(1) == 1, "Function test_another with arg 1 should not be called because it is already cached"
# Let's now wait for the cache to expire
time.sleep(1)
assert test(1) == 3, "Function test with arg 1 should be called because the cache already expired"
assert test(-1) == 4, "Function test with arg -1 should be called because the cache already expired"
# func.cache_clear clear func's cache, not all lru cache
assert test_another(1) == 1, "Function test_another with arg 1 should not be called because the cache NOT expired yet"
many thanks to everybody sharing here! to further pile on to this gist, here are my suggested changes to @svpino's version:
def lru_cache(timeout: int, maxsize: int = 128, typed: bool = False):
def wrapper_cache(func):
func = functools.lru_cache(maxsize=maxsize, typed=typed)(func)
func.delta = timeout * 10 ** 9
func.expiration = time.monotonic_ns() + func.delta
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
if time.monotonic_ns() >= func.expiration:
func.cache_clear()
func.expiration = time.monotonic_ns() + func.delta
return func(*args, **kwargs)
wrapped_func.cache_info = func.cache_info
wrapped_func.cache_clear = func.cache_clear
return wrapped_func
return wrapper_cache
- renamed the decorator to
lru_cache
and the timeout parameter totimeout
;) - using
time.monotonic_ns
avoids expensive conversion to and fromdatetime
/timedelta
and prevents possible issues with system clocks drifting or changing - attaching the original lru_cache's
cache_info
andcache_clear
methods to ourwrapped_func
Solid update, @fdemmer.
Further tidying up from @fdemmer version, a fully working snippet
from functools import lru_cache, wraps
from time import monotonic_ns
def timed_lru_cache(
_func=None, *, seconds: int = 600, maxsize: int = 128, typed: bool = False
):
"""Extension of functools lru_cache with a timeout
Parameters:
seconds (int): Timeout in seconds to clear the WHOLE cache, default = 10 minutes
maxsize (int): Maximum Size of the Cache
typed (bool): Same value of different type will be a different entry
"""
def wrapper_cache(f):
f = lru_cache(maxsize=maxsize, typed=typed)(f)
f.delta = seconds * 10 ** 9
f.expiration = monotonic_ns() + f.delta
@wraps(f)
def wrapped_f(*args, **kwargs):
if monotonic_ns() >= f.expiration:
f.cache_clear()
f.expiration = monotonic_ns() + f.delta
return f(*args, **kwargs)
wrapped_f.cache_info = f.cache_info
wrapped_f.cache_clear = f.cache_clear
return wrapped_f
# To allow decorator to be used without arguments
if _func is None:
return wrapper_cache
else:
return wrapper_cache(_func)
With documentations, imports, and allow decorators to be called without arguments and paratheses
@jianshen92 👌💪
The implementation has a big problem: if you have a function that you can call with different values and you obviously want the result cached with TTL for each calling value, then when the TTL is reached for one calling value, the cache is cleared of ALL CACHED RESULTS, that is FOR ALL CALLING VALUES.
Sample code:
import time
import random
@timed_cache(seconds=10)
def expensive_operation(a: int):
return random.randint(1, 1 + a)
def ex_op_wrapper(a: int):
return f'{time.time()}: {expensive_operation(a)}'
Calling in reply with a 6 secs pause between the first and second call:
ex_op_wrapper(1000)
'1657014039.3334417: 762'
ex_op_wrapper(100)
'1657014045.5532942: 4'
ex_op_wrapper(1000)
'1657014047.3158472: 762'
ex_op_wrapper(100)
'1657014048.6246898: 4'
ex_op_wrapper(1000)
'1657014049.7079725: 847'
ex_op_wrapper(100)
'1657014050.7649162: 70'
You can see that the first cached result for calling with 100 was 4 at '1657014045.5532942', then that was changed at '1657014050.7649162' to 70, so only 5 secs after the first caching of 4, instead of 10.
The problem in the above code is that f.cache_clear()
clears the cache for all calling values, not just for the expired one.
Thanks guys ! Btw it can leads to a TypeError: unhashable type: 'list'
if you have list args.
A fix could be to cast those args to tuple (more info : https://stackoverflow.com/a/49210802)
This piece of code will fix this :
for arg, value in kwargs.items():
kwargs[arg] = tuple(value) if type(value) == list else value
Thanks guys ! Btw it can leads to a
TypeError: unhashable type: 'list'
if you have list args. A fix could be to cast those args to tuple (more info : https://stackoverflow.com/a/49210802) This piece of code will fix this :for arg, value in kwargs.items(): kwargs[arg] = tuple(value) if type(value) == list else value
The behavior remains the same but I would suggest to use isinstance()
instead of type()
for arg, value in kwargs.items():
kwargs[arg] = tuple(value) if isinstance(value, list) else value
Thanks for the implementations! Really helpful!
Something I noticed is that neither of these implementations work with pytest-antilru. This is likely due to the lru_cache
which is monkeypatched is not patched early enough: ipwnponies/pytest-antilru#28.
args = [tuple(v) if isinstance(v, list) else v for v in args]
for args too
Thanks guys ! Btw it can leads to a
TypeError: unhashable type: 'list'
if you have list args. A fix could be to cast those args to tuple (more info : https://stackoverflow.com/a/49210802) This piece of code will fix this :for arg, value in kwargs.items(): kwargs[arg] = tuple(value) if type(value) == list else valueThe behavior remains the same but I would suggest to use
isinstance()
instead oftype()
for arg, value in kwargs.items(): kwargs[arg] = tuple(value) if isinstance(value, list) else value