Skip to content

Instantly share code, notes, and snippets.

@Proteusiq
Created July 10, 2021 06:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Proteusiq/cf8d0fd9a0b3dfb16d9d4ed77d69ecbe to your computer and use it in GitHub Desktop.
Save Proteusiq/cf8d0fd9a0b3dfb16d9d4ed77d69ecbe to your computer and use it in GitHub Desktop.
Share objects
import json
import zlib
from typing import Any, Optional, Dict
import diskcache as dc
class DataCache:
def __init__(
self,
cache_path: str,
expire: Optional[int] = 24 * 360,
tag: Optional[str] = "data",
compress_level: Optional[int] = 1,
) -> None:
self.cache_path = cache_path
self.expire = expire # seconds until data expires
self.tag = tag
self.compress_level = compress_level
def set(self, key: str, contents: Any) -> bool:
with dc.Cache(self.cache_path) as cache:
contents_bytes = json.dumps(
contents, default=lambda x: list(x) if isinstance(x, set) else x
).encode("utf-8")
data = zlib.compress(contents_bytes, self.compress_level)
status = cache.set(
key,
value=data,
expire=self.expire,
read=True,
tag=self.tag,
)
return status
def get(self, key: str) -> Dict:
with dc.Cache(self.cache_path) as cache:
result = cache.get(key, read=True, expire_time=True, tag=True)
contents_bytes, timestamp, tag = result
if contents_bytes:
data = zlib.decompress(contents_bytes).decode("utf-8")
contents = json.loads(data)
return {
"expired": False,
"contents": contents,
"time": timestamp,
"tag": tag,
}
else:
return {"expired": True, "contents": None, "time": timestamp, "tag": tag}
def delete(self, key: str) -> bool:
status = False
with dc.Cache(self.cache_path) as cache:
if key in cache:
del cache[key]
status = True
return status
def __contains__(self, key: str) -> bool:
with dc.Cache(self.cache_path) as cache:
return key in cache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment