Skip to content

Instantly share code, notes, and snippets.

@bencharb
Last active April 17, 2016 05:22
Show Gist options
  • Save bencharb/0b77644bdd59d7b96f27d8200d6d29b4 to your computer and use it in GitHub Desktop.
Save bencharb/0b77644bdd59d7b96f27d8200d6d29b4 to your computer and use it in GitHub Desktop.
expirable disk cache
import os
import datetime
import time
import shove
SEC = 1
MIN = SEC * 60
HOUR = MIN * 60
DAY = HOUR * 24
DEFAULT_EXPIRATION_THRESHOLD = 3*HOUR
def timestampnow():
now = datetime.datetime.utcnow()
return int(time.mktime(now.timetuple()))
def create_file_shove(cache_folder):
if os.path.exists(cache_folder):
assert os.path.isdir(cache_folder)
dirname = os.path.dirname(cache_folder)
if not os.path.exists(dirname):
os.makedirs(dirname)
uri = 'file://' + cache_folder
return shove.Shove(uri)
class MissingExpirationKey(Exception):
pass
class DiskCache(object):
expiration_prefix = '__exp__'
def __init__(self, cache_folder, ttl=DEFAULT_EXPIRATION_THRESHOLD):
super(DiskCache, self).__init__()
self.ttl = ttl
assert isinstance(self.ttl, int)
self._store = create_file_shove(cache_folder)
def get_or_create_expiration_key(self, key):
'''Return key and created in a tuple, such as ('__expir__somekey', False,)'''
if not key.startswith(self.expiration_prefix):
return self.expiration_prefix + key, True
return key, False
def _should_expire(self, key, ttl=None):
expkey, created = self.get_or_create_expiration_key(key)
key_was_already_expkey = not created
if not ttl:
ttl = self.ttl
expval = self._store.get(expkey)
if expval in (None, '',):
if not key_was_already_expkey: #key is normal key, but didnt have exp value
raise MissingExpirationKey(key)
return False
expval = int(expval)
return timestampnow() - expval > ttl
def should_expire(self, key, ttl=None):
try:
return self._should_expire(key, ttl=ttl)
except MissingExpirationKey:
return True
def expire(self, key):
expkey, created = self.get_or_create_expiration_key(key)
self.delete(key)
self.delete(expkey)
def get(self, key):
self.run_expiration_rule(key)
return self._store.get(key)
def run_expiration_rule(self, key):
if self.should_expire(key, ttl=self.ttl):
self.expire(key)
def create_expiration_time(self, ttl=None):
if ttl is None:
ttl = self.ttl
return timestampnow() + ttl
def set(self, key, val, ttl=None):
expkey, created = self.get_or_create_expiration_key(key)
if not created:
self._store[key] = int(val)
else:
self._store[key] = val
self._store[expkey] = self.create_expiration_time(ttl=ttl)
def keys(self):
return self._store.keys()
def delete(self, key):
try:
del self._store[key]
except KeyError:
pass
def __getitem__(self, key):
return self.get(key)
def __setitem__(self, key, val):
return self.set(key, val)
def __delitem__(self, key):
return self.delete(key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment