Skip to content

Instantly share code, notes, and snippets.

@Sinkler
Created August 20, 2021 14:10
Show Gist options
  • Save Sinkler/0b6389ff8ced9e1c0d82e2c478862a08 to your computer and use it in GitHub Desktop.
Save Sinkler/0b6389ff8ced9e1c0d82e2c478862a08 to your computer and use it in GitHub Desktop.
The emcache backend for the aiocache library
import asyncio
import typing
import emcache
from aiocache.base import BaseCache
from aiocache.serializers import BaseSerializer
import settings
class EmcacheCache(BaseCache):
NAME = "memcached"
def __init__(self, serializer=None, endpoint="127.0.0.1", port=11211, pool_size=2, loop=None, **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
self.port = port
self.pool_size = int(pool_size)
self._loop = loop
self.client: typing.Optional[typing.Union[emcache.Client, emcache.client._Client]] = None
self.hosts = [emcache.MemcachedHostAddress(self.endpoint, self.port)]
self.serializer = serializer
def __repr__(self): # pragma: no cover
return "EmcacheCache ({}:{})".format(self.endpoint, self.port)
def _build_key(self, key, namespace=None):
ns_key = super()._build_key(key, namespace=namespace).replace(" ", "_")
return str.encode(ns_key)
async def _load_client(self):
if self.client is None or self.client._closed:
self.client = await emcache.create_client(
self.hosts,
timeout=settings.CACHE_TIMEOUT,
max_connections=self.pool_size,
autobatching=True,
)
async def _get(self, key, encoding=BaseSerializer.DEFAULT_ENCODING, _conn=None):
await self._load_client()
item = await self.client.get(key)
if encoding is None and item:
return item.value
if item is None:
return item
return item.value.decode(encoding)
async def _gets(self, key, encoding=BaseSerializer.DEFAULT_ENCODING, _conn=None):
await self._load_client()
key = key.encode() if isinstance(key, str) else key
item = await self.client.gets(key)
return item and item.cas
async def _multi_get(self, keys, encoding=BaseSerializer.DEFAULT_ENCODING, _conn=None):
await self._load_client()
keys = [key.encode() if isinstance(key, str) else key for key in keys]
values = []
for item in (await self.client.get_many(keys)).values():
if encoding is None and item:
values.append(item.value)
elif item is None:
values.append(None)
else:
values.append(item.value.decode(encoding))
return values
async def _set(self, key, value, ttl=0, _cas_token=None, _conn=None):
await self._load_client()
value = value.encode() if isinstance(value, str) else value
if _cas_token is not None:
await self._cas(key, value, _cas_token, ttl=ttl, _conn=_conn)
return True
await self.client.set(key, value, exptime=ttl or 0)
return True
async def _cas(self, key, value, token, ttl=None, _conn=None):
await self._load_client()
try:
await self.client.cas(key, value, token, exptime=ttl or 0)
except emcache.NotStoredStorageCommandError:
return False
return True
async def _multi_set(self, pairs, ttl=0, _conn=None):
await self._load_client()
tasks = []
for key, value in pairs:
value = str.encode(value) if isinstance(value, str) else value
tasks.append(self.client.set(key, value, exptime=ttl or 0))
await asyncio.gather(*tasks)
return True
async def _add(self, key, value, ttl=0, _conn=None):
await self._load_client()
value = str.encode(value) if isinstance(value, str) else value
await self.client.add(key, value, exptime=ttl or 0)
return True
async def _exists(self, key, _conn=None):
await self._load_client()
try:
await self.client.append(key, b"")
except emcache.NotStoredStorageCommandError:
return False
return True
async def _increment(self, key, delta, _conn=None):
await self._load_client()
incremented = None
try:
if delta > 0:
incremented = await self.client.increment(key, delta)
else:
incremented = await self.client.decrement(key, abs(delta))
except emcache.NotFoundCommandError:
await self._set(key, str(delta))
return incremented or delta
async def _expire(self, key, ttl, _conn=None):
await self._load_client()
return await self.client.touch(key, ttl)
async def _delete(self, key, _conn=None):
await self._load_client()
try:
await self.client.delete(key)
except emcache.NotFoundCommandError:
return False
return True
async def _clear(self, namespace=None, _conn=None):
await self._load_client()
if namespace:
raise ValueError("MemcachedBackend doesnt support flushing by namespace")
for idx, host in enumerate(self.hosts):
await self.client.flush_all(host, delay=10 + (10 * idx))
return True
async def _redlock_release(self, key, _):
await self._load_client()
return await self._delete(key)
async def _close(self, *args, _conn=None, **kwargs):
await self._load_client()
await self.client.close()
@classmethod
def parse_uri_path(self, path):
return {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment