Skip to content

Instantly share code, notes, and snippets.

@Morreski
Last active September 3, 2024 00:29
Show Gist options
  • Save Morreski/c1d08a3afa4040815eafd3891e16b945 to your computer and use it in GitHub Desktop.
Save Morreski/c1d08a3afa4040815eafd3891e16b945 to your computer and use it in GitHub Desktop.
Python lru_cache with timeout
from datetime import datetime, timedelta
import functools
def timed_cache(**timedelta_kwargs):
def _wrapper(f):
update_delta = timedelta(**timedelta_kwargs)
next_update = datetime.utcnow() + update_delta
# Apply @lru_cache to f with no cache size limit
f = functools.lru_cache(None)(f)
@functools.wraps(f)
def _wrapped(*args, **kwargs):
nonlocal next_update
now = datetime.utcnow()
if now >= next_update:
f.cache_clear()
next_update = now + update_delta
return f(*args, **kwargs)
return _wrapped
return _wrapper
@adrianlzt
Copy link

from datetime import timedelta,datetime
import functools

@adrianlzt
Copy link

Great pìece of code. Thanks!!

@Morreski
Copy link
Author

Morreski commented Jan 8, 2018

Thanks for your feedback ! And for mentionning the imports. :)

@jdutriaux
Copy link

So simple yet so useful! Thanks @Morreski! 👍

@p1v2
Copy link

p1v2 commented Aug 15, 2018

Well done 👏

@jmdacruz
Copy link

Thanks @Morreski! Take a look at this modification to support passing arguments to the underlying lru_cache method: https://gist.github.com/jmdacruz/764bcaa092eefc369a8bfb90c5fe3227

@ToonoW
Copy link

ToonoW commented Jan 21, 2019

good one

@ToonoW
Copy link

ToonoW commented Jan 21, 2019

Add support lru_cache of maxsize and typed.

from datetime import datetime, timedelta
import functools


def timed_cache(**timed_cache_kwargs):                                              
                                                                                  
    def _wrapper(f):
        maxsize = timed_cache_kwargs.pop('maxsize', 128)
        typed = timed_cache_kwargs.pop('typed', False)
        update_delta = timedelta(**timed_cache_kwargs)                              
        next_update = datetime.utcnow() - update_delta                            
        f = functools.lru_cache(maxsize=maxsize, typed=False)(f)                                          
                                                                                                                      
        @functools.wraps(f)                                                       
        def _wrapped(*args, **kwargs): 
            nonlocal next_update       
            now = datetime.utcnow()                                               
            if now >= next_update:                                                
                f.cache_clear()                                                   
                next_update = now + update_delta                                
            return f(*args, **kwargs)                                             
        return _wrapped                                                           
    return _wrapper

@hponde
Copy link

hponde commented Jan 22, 2019

I think it should be next_update = datetime.utcnow() + update_delta but in fact it does not change the correctness of the solution since if will force a flush on the first call. It's just not needed and if copy pasted to another context it could be wrong.

from datetime import datetime, timedelta
import functools


def timed_cache(**timedelta_kwargs):                                              
                                                                                  
    def _wrapper(f):                                                              
        update_delta = timedelta(**timedelta_kwargs)                              
        next_update = datetime.utcnow() + update_delta                            
        # Apply @lru_cache to f with no cache size limit                          
        f = functools.lru_cache(None)(f)                                          
                                                                                                                      
        @functools.wraps(f)                                                       
        def _wrapped(*args, **kwargs):                                            
            nonlocal next_update                                                  
            now = datetime.utcnow()                                               
            if now >= next_update:                                                
                f.cache_clear()                                                   
                next_update = now + update_delta                                
            return f(*args, **kwargs)                                             
        return _wrapped                                                           
    return _wrapper

@Spaider
Copy link

Spaider commented Jan 24, 2019

@ToonoW

f = functools.lru_cache(maxsize=maxsize, typed=False)(f)

There should be typed=typed instead of typed=False

In general, nice piece of code but what's the point to clear whole cache after timeout? To me, timeout should be applied to individual results.

@linclelinkpart5
Copy link

@ToonoW

f = functools.lru_cache(maxsize=maxsize, typed=False)(f)

There should be typed=typed instead of typed=False

In general, nice piece of code but what's the point to clear whole cache after timeout? To me, timeout should be applied to individual results.

I agree, I was hoping for a recipe for a per-element expiration, this example is far too heavy-handed, as it clears the ENTIRE cache if any individual element is outdated.

@Orenoid
Copy link

Orenoid commented Jan 13, 2020

@Spaider @linclelinkpart5
Here is a version that supports per-element expiration.
Since the official "lru_cache" doesn't offer api to remove specific element from cache, I have to re-implement it. Most of the code are just from the original "lru_cache", except the parts for expiration and the class "Node" to implement linked list. (The official version implements
linked list with array)

@myrheimb
Copy link

Thank you for this! I used it in a project where we have 100% test coverage so I wrote this simple test for it.
Thought it could be useful for others as well.

import unittest

class Testing(unittest.TestCase):
    def test_timed_cache(self):
        """Test the timed_cache decorator."""

        from python_file import timed_cache

        import logging
        import time

        cache_logger = logging.getLogger("foo_log")

        @timed_cache(seconds=1)
        def cache_testing_function(num1, num2):
            cache_logger.info("Not cached yet.")
            return num1 + num2

        with self.assertLogs("foo_log", level="INFO") as cache_log:

            result1 = cache_testing_function(2, 3)
            self.assertEqual(cache_log.output[0], "INFO:foo_log:Not cached yet.")
            assert result1 == 5

            result2 = cache_testing_function(2, 3)
            assert len(cache_log.output) == 1
            assert result2 == 5

            time.sleep(1)

            result3 = cache_testing_function(2, 3)
            self.assertEqual(cache_log.output[1], "INFO:foo_log:Not cached yet.")
            assert result3 == 5

@Morreski
Copy link
Author

I think it should be next_update = datetime.utcnow() + update_delta but in fact it does not change the correctness of the solution since if will force a flush on the first call. It's just not needed and if copy pasted to another context it could be wrong.

from datetime import datetime, timedelta
import functools


def timed_cache(**timedelta_kwargs):                                              
                                                                                  
    def _wrapper(f):                                                              
        update_delta = timedelta(**timedelta_kwargs)                              
        next_update = datetime.utcnow() + update_delta                            
        # Apply @lru_cache to f with no cache size limit                          
        f = functools.lru_cache(None)(f)                                          
                                                                                                                      
        @functools.wraps(f)                                                       
        def _wrapped(*args, **kwargs):                                            
            nonlocal next_update                                                  
            now = datetime.utcnow()                                               
            if now >= next_update:                                                
                f.cache_clear()                                                   
                next_update = now + update_delta                                
            return f(*args, **kwargs)                                             
        return _wrapped                                                           
    return _wrapper

Hi ! You're 100% right. I updated the gist with your fixed version. Thanks !

@svpino
Copy link

svpino commented Jun 3, 2020

Thanks for this! Very helpful.

I used this function in one of my projects but modified it a little bit before using it.

def cache(seconds: int, maxsize: int = 128, typed: bool = False):
    def wrapper_cache(func):
        func = functools.lru_cache(maxsize=maxsize, typed=typed)(func)
        func.delta = timedelta(seconds=seconds)
        func.expiration = datetime.utcnow() + func.delta

        @functools.wraps(func)
        def wrapped_func(*args, **kwargs):
            if datetime.utcnow() >= func.expiration:
                func.cache_clear()
                func.expiration = datetime.utcnow() + func.delta

            return func(*args, **kwargs)

        return wrapped_func

    return wrapper_cache

Here are some notes about this version:

  • The @cache decorator simply expects the number of seconds instead of the full list of arguments expected by timedelta. This avoids leaking timedelta's interface outside of the implementation of @cache. Having the number of seconds should be flexible enough to invalidate the cache at any interval.

  • maxsize and typed can now be explicitly declared as part of the arguments expected by @cache.

  • By adding the delta and expiration variables to the func we don't have to use the nonlocal variables, which makes for more readable and compact code.

Also, here is a pytest test case:

def test_cache():
    count = 0

    @cache(seconds=1)
    def test(arg1):
        nonlocal count
        count += 1
        return count

    assert test(1) == 1, "Function should be called the first time we invoke it"
    assert test(1) == 1, "Function should not be called because it is already cached"

    # Let's now wait for the cache to expire
    time.sleep(1)

    assert test(1) == 2, "Function should be called because the cache already expired"

@MioYvo
Copy link

MioYvo commented Aug 5, 2020

Thanks for this! Very helpful.

I used this function in one of my projects but modified it a little bit before using it.

def cache(seconds: int, maxsize: int = 128, typed: bool = False):
    def wrapper_cache(func):
        func = functools.lru_cache(maxsize=maxsize, typed=typed)(func)
        func.delta = timedelta(seconds=seconds)
        func.expiration = datetime.utcnow() + func.delta

        @functools.wraps(func)
        def wrapped_func(*args, **kwargs):
            if datetime.utcnow() >= func.expiration:
                func.cache_clear()
                func.expiration = datetime.utcnow() + func.delta

            return func(*args, **kwargs)

        return wrapped_func

    return wrapper_cache

Here are some notes about this version:

  • The @cache decorator simply expects the number of seconds instead of the full list of arguments expected by timedelta. This avoids leaking timedelta's interface outside of the implementation of @cache. Having the number of seconds should be flexible enough to invalidate the cache at any interval.
  • maxsize and typed can now be explicitly declared as part of the arguments expected by @cache.
  • By adding the delta and expiration variables to the func we don't have to use the nonlocal variables, which makes for more readable and compact code.

Also, here is a pytest test case:

def test_cache():
    count = 0

    @cache(seconds=1)
    def test(arg1):
        nonlocal count
        count += 1
        return count

    assert test(1) == 1, "Function should be called the first time we invoke it"
    assert test(1) == 1, "Function should not be called because it is already cached"

    # Let's now wait for the cache to expire
    time.sleep(1)

    assert test(1) == 2, "Function should be called because the cache already expired"

Thanks your share, it's very good!

I add some test and info about test_cache for some people's doubts.

def test_cache():
    count = 0
    count2 = 0

    @cache(seconds=1)
    def test(arg1):
        nonlocal count
        count += 1
        return count

    @cache(seconds=10)
    def test_another(arg2):
        nonlocal count2
        count2 += 1
        return count2

    assert test(1) == 1, "Function test with arg 1 should be called the first time we invoke it"
    assert test(1) == 1, "Function test with arg 1 should not be called because it is already cached"

    assert test(-1) == 2, "Function test with arg -1 should be called the first time we invoke it"
    assert test(-1) == 2, "Function test with arg -1 should not be called because it is already cached"

    assert test_another(1) == 1, "Function test_another with arg 1 should be called the first time we invoke it"
    assert test_another(1) == 1, "Function test_another with arg 1 should not be called because it is already cached"

    # Let's now wait for the cache to expire
    time.sleep(1)

    assert test(1) == 3, "Function test with arg 1 should be called because the cache already expired"
    assert test(-1) == 4, "Function test with arg -1 should be called because the cache already expired"

    # func.cache_clear clear func's cache, not all lru cache 
    assert test_another(1) == 1, "Function test_another with arg 1 should not be called because the cache NOT expired yet"

@fdemmer
Copy link

fdemmer commented Aug 31, 2020

many thanks to everybody sharing here! to further pile on to this gist, here are my suggested changes to @svpino's version:

def lru_cache(timeout: int, maxsize: int = 128, typed: bool = False):
    def wrapper_cache(func):
        func = functools.lru_cache(maxsize=maxsize, typed=typed)(func)
        func.delta = timeout * 10 ** 9
        func.expiration = time.monotonic_ns() + func.delta

        @functools.wraps(func)
        def wrapped_func(*args, **kwargs):
            if time.monotonic_ns() >= func.expiration:
                func.cache_clear()
                func.expiration = time.monotonic_ns() + func.delta
            return func(*args, **kwargs)

        wrapped_func.cache_info = func.cache_info
        wrapped_func.cache_clear = func.cache_clear
        return wrapped_func
    return wrapper_cache
  • renamed the decorator to lru_cache and the timeout parameter to timeout ;)
  • using time.monotonic_ns avoids expensive conversion to and from datetime/timedelta and prevents possible issues with system clocks drifting or changing
  • attaching the original lru_cache's cache_info and cache_clear methods to our wrapped_func

@svpino
Copy link

svpino commented Aug 31, 2020

Solid update, @fdemmer.

@jianshen92
Copy link

jianshen92 commented Nov 9, 2020

Further tidying up from @fdemmer version, a fully working snippet

from functools import lru_cache, wraps
from time import monotonic_ns


def timed_lru_cache(
    _func=None, *, seconds: int = 600, maxsize: int = 128, typed: bool = False
):
    """Extension of functools lru_cache with a timeout

    Parameters:
    seconds (int): Timeout in seconds to clear the WHOLE cache, default = 10 minutes
    maxsize (int): Maximum Size of the Cache
    typed (bool): Same value of different type will be a different entry

    """

    def wrapper_cache(f):
        f = lru_cache(maxsize=maxsize, typed=typed)(f)
        f.delta = seconds * 10 ** 9
        f.expiration = monotonic_ns() + f.delta

        @wraps(f)
        def wrapped_f(*args, **kwargs):
            if monotonic_ns() >= f.expiration:
                f.cache_clear()
                f.expiration = monotonic_ns() + f.delta
            return f(*args, **kwargs)

        wrapped_f.cache_info = f.cache_info
        wrapped_f.cache_clear = f.cache_clear
        return wrapped_f

    # To allow decorator to be used without arguments
    if _func is None:
        return wrapper_cache
    else:
        return wrapper_cache(_func)

With documentations, imports, and allow decorators to be called without arguments and paratheses

@Alcheri-zz
Copy link

Alcheri-zz commented Sep 21, 2021

@jianshen92 👌💪

@valentinbrasov
Copy link

The implementation has a big problem: if you have a function that you can call with different values and you obviously want the result cached with TTL for each calling value, then when the TTL is reached for one calling value, the cache is cleared of ALL CACHED RESULTS, that is FOR ALL CALLING VALUES.

Sample code:

import time
import random

@timed_cache(seconds=10)
def expensive_operation(a: int):
    return random.randint(1, 1 + a)
    
def ex_op_wrapper(a: int):
    return f'{time.time()}: {expensive_operation(a)}'

Calling in reply with a 6 secs pause between the first and second call:

ex_op_wrapper(1000)
'1657014039.3334417: 762'
ex_op_wrapper(100)
'1657014045.5532942: 4'
ex_op_wrapper(1000)
'1657014047.3158472: 762'
ex_op_wrapper(100)
'1657014048.6246898: 4'
ex_op_wrapper(1000)
'1657014049.7079725: 847'
ex_op_wrapper(100)
'1657014050.7649162: 70'

You can see that the first cached result for calling with 100 was 4 at '1657014045.5532942', then that was changed at '1657014050.7649162' to 70, so only 5 secs after the first caching of 4, instead of 10.

The problem in the above code is that f.cache_clear() clears the cache for all calling values, not just for the expired one.

@Naelpuissant
Copy link

Naelpuissant commented Oct 13, 2022

Thanks guys ! Btw it can leads to a TypeError: unhashable type: 'list' if you have list args.
A fix could be to cast those args to tuple (more info : https://stackoverflow.com/a/49210802)
This piece of code will fix this :

for arg, value in kwargs.items():
    kwargs[arg] = tuple(value) if type(value) == list else value

@giordano91
Copy link

giordano91 commented Jan 13, 2023

Thanks guys ! Btw it can leads to a TypeError: unhashable type: 'list' if you have list args. A fix could be to cast those args to tuple (more info : https://stackoverflow.com/a/49210802) This piece of code will fix this :

for arg, value in kwargs.items():
    kwargs[arg] = tuple(value) if type(value) == list else value

The behavior remains the same but I would suggest to use isinstance() instead of type()

for arg, value in kwargs.items():
    kwargs[arg] = tuple(value) if isinstance(value, list) else value

@HansBambel
Copy link

Thanks for the implementations! Really helpful!

Something I noticed is that neither of these implementations work with pytest-antilru. This is likely due to the lru_cache which is monkeypatched is not patched early enough: ipwnponies/pytest-antilru#28.

@guschnwg
Copy link

args = [tuple(v) if isinstance(v, list) else v for v in args]

for args too

Thanks guys ! Btw it can leads to a TypeError: unhashable type: 'list' if you have list args. A fix could be to cast those args to tuple (more info : https://stackoverflow.com/a/49210802) This piece of code will fix this :

for arg, value in kwargs.items():
    kwargs[arg] = tuple(value) if type(value) == list else value

The behavior remains the same but I would suggest to use isinstance() instead of type()

for arg, value in kwargs.items():
    kwargs[arg] = tuple(value) if isinstance(value, list) else value

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment