Skip to content

Instantly share code, notes, and snippets.

@leafsummer
Last active September 19, 2021 21:48
Show Gist options
  • Save leafsummer/afc42b7cfabb761d2dd1 to your computer and use it in GitHub Desktop.
Save leafsummer/afc42b7cfabb761d2dd1 to your computer and use it in GitHub Desktop.
custom redis tools
#!/usr/bin/env python
import redis
from ncelery import conf
class RedisClient(object):
"""
Singleton pattern
http://stackoverflow.com/questions/42558/python-and-the-singleton-pattern
"""
_instance = {}
def __new__(cls, *args):
if str(args) not in cls._instance:
cls._instance[str(args)] = super(RedisClient, cls).__new__(cls)
return cls._instance[str(args)]
def __init__(self, server_conf={'host': 'localhost', 'port': 6379}):
self.pool = redis.ConnectionPool(**server_conf)
self.rcli = redis.StrictRedis(connection_pool=self.pool)
@property
def __dict__(self):
try:
return self.rcli.__dict__
except RuntimeError:
raise AttributeError('__dict__')
def __repr__(self):
return repr(self.rcli)
def __dir__(self):
try:
return dir(self.rcli)
except RuntimeError:
return []
def __getattr__(self, name):
if name == '__members__':
return dir(self.rcli)
return getattr(self.rcli, name)
rc = RedisClient({
'host': 'instance.redis.com',
'port': 6379,
'db': 0,
'password': None
})
class RedisQueue(object):
"""
Args:
serializer:
the class or module to serialize msgs with, must have
methods or functions named ``dumps`` and ``loads``
such as `pickle` and json are good idea
"""
def __init__(self, name, serializer=None, llen=None):
self.name = name
self.serializer = serializer
self.llen = llen
def __len__(self):
return rc.llen(self.name)
def __iter__(self):
for i in rc.lrange(self.name, 0, -1):
yield i
def __repr__(self):
return 'redis.queue: <%s>' % self.name
def getlist(self, start=None, end=None):
msg_list = []
try:
start = start if start is not None else 0
end = end if end is not None else self.__len__()
_list = rc.lrange(self.name, start, end)
if self.serializer is not None:
for msg in _list:
msg_list.append(self.serializer.loads(msg))
else:
msg_list.extend(_list)
except:
pass
return msg_list
def put(self, msgs):
if self.serializer is not None:
msgs = self.serializer.dumps(msgs)
if self.llen is not None:
list_len = self.__len__()
if list_len >= self.llen:
pop_len = list_len - self.llen + 1
for i in range(pop_len):
rc.lpop(self.name)
msg = rc.rpush(self.name, msgs)
return msg
def pop(self, block=False, timeout=0):
if block:
msg = rc.blpop(self.name, timeout)
if msg is not None:
msg = msg[1]
else:
msg = rc.lpop(self.name)
if self.serializer is not None and msg is not None:
return self.serializer.loads(msg)
return msg
def consume(self, block=True, timeout=0):
while 1:
try:
msg = self.pop(block, timeout)
yield msg
except KeyboardInterrupt:
return
def clear(self):
if rc.exists(self.name):
rc.delete(self.name)
class RedisString(object):
"""
Args:
serializer:
the class or module to serialize msgs with, must have
methods or functions named ``dumps`` and ``loads``
such as `pickle` and json are good idea
"""
def __init__(self, serializer=None):
self.serializer = serializer
def __repr__(self):
return 'redis.string'
def set(self, name, msgs, timeout=None):
if self.serializer is not None:
msgs = self.serializer.dumps(msgs)
rc.set(name, msgs)
if timeout is not None:
rc.expire(name, int(timeout))
def add(self, name, msgs, timeout=None):
if rc.exists(name):
return False
else:
self.set(name, msgs, timeout)
return True
def get(self, name, block=True):
msg = rc.get(name)
try:
if msg is not None and self.serializer is not None:
return self.serializer.loads(msg)
except:
pass
return msg
def keys(self, pattern='*'):
return rc.keys(pattern)
def isexist(self, name):
if rc.exists(name):
return True
else:
return False
def delete(self, name):
if rc.exists(name):
rc.delete(name)
class RedisSet(object):
"""
Args:
serializer:
the class or module to serialize msgs with, must have
methods or functions named ``dumps`` and ``loads``
such as `pickle` and json are good idea
"""
def __init__(self, name, serializer=None):
self.name = name
self.serializer = serializer
def __len__(self):
return rc.scard(self.name)
def __iter__(self):
for i in rc.smembers(self.name):
yield i
def __repr__(self):
return 'redis.set:<%s>' % self.name
def isexist(self, msg):
if self.serializer is not None:
msg = self.serializer.dumps(msg)
if rc.sismember(self.name, msg):
return True
else:
return False
def add(self, msg):
if self.serializer is not None:
msg = self.serializer.dumps(msg)
flag = rc.sadd(self.name, msg)
if flag:
return True
else:
return False
def rem(self, msg):
if self.serializer is not None:
msg = self.serializer.dumps(msg)
flag = rc.srem(self.name, msg)
if flag:
return True
else:
return False
def getlist(self):
msg_list = list()
_msg_set = rc.smembers(self.name)
if self.serializer is not None:
for msg in _msg_set:
msg_list.append(self.serializer.loads(msg))
else:
msg_list.extend(list(_msg_set))
return msg_list
if __name__ == '__main__':
rq = RedisQueue('client:log:nta:trafficevent')
for m in rq.consume():
print len(m), type(m)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment