Skip to content

Instantly share code, notes, and snippets.

@deliro
Created January 29, 2019 10:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deliro/ab45680b2976546330fe255b5a7a43f2 to your computer and use it in GitHub Desktop.
Save deliro/ab45680b2976546330fe255b5a7a43f2 to your computer and use it in GitHub Desktop.
import redis
from time import sleep
from random import randint
from redq import RedisDeque
r = redis.StrictRedis()
q = RedisDeque(r, "l")
def consumer():
while True:
task = q.pop_wait()
yield task() if callable(task) else task
if __name__ == "__main__":
for d in consumer():
print(d)
import redis
from time import sleep
from random import randint
from redq import RedisDeque
r = redis.StrictRedis()
q = RedisDeque(r, "l")
if __name__ == "__main__":
for i in range(1000):
s = randint(0, 1000)
q.appendleft(i)
sleep(s / 1000)
import pickle as _pickle
import json
from functools import partial
class pickle:
dumps = partial(_pickle.dumps, protocol=_pickle.HIGHEST_PROTOCOL)
loads = _pickle.loads
class RedisDeque:
def __init__(self, conn, name, serializer=pickle):
self._conn = conn
self._name = name
self._serializer = serializer
def __len__(self):
return self._conn.llen(self._name)
def __str__(self):
els = [self.__decode(e) for e in self._conn.lrange(self._name, 0, 21)]
if len(els) > 20:
postfix = ", ..."
else:
postfix = ""
return "[" + ", ".join(str(e) for e in els[:20]) + postfix + "]"
def __repr__(self):
return f"<RedisDeque ('{self._name}'): {str(self)}>"
def __encode(self, e):
return self._serializer.dumps(e)
def __decode(self, e):
return self._serializer.loads(e)
def append(self, e):
self._conn.rpush(self._name, self.__encode(e))
def extend(self, it):
self._conn.rpush(self._name, *(self.__encode(e) for e in it))
def appendleft(self, e):
self._conn.lpush(self._name, self.__encode(e))
def extendleft(self, it):
self._conn.lpush(self._name, *(self.__encode(e) for e in it))
def pop(self):
v = self._conn.rpop(self._name)
if v is None:
raise IndexError
return self.__decode(v)
def pop_wait(self):
v = self._conn.brpop(self._name, 0)[-1]
return self.__decode(v)
def popleft(self):
v = self._conn.lpop(self._name)
if v is None:
raise IndexError
return self.__decode(v)
def popleft_wait(self):
v = self._conn.blpop(self._name, 0)[-1]
return self.__decode(v)
def clear(self):
self._conn.delete(self._name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment