Skip to content

Instantly share code, notes, and snippets.

@adoc
Last active August 29, 2015 13:57
Show Gist options
  • Save adoc/9586483 to your computer and use it in GitHub Desktop.
Save adoc/9586483 to your computer and use it in GitHub Desktop.
"""Just some simple threaded redis pool classes. Also some useful
primitive data models.
"""
# Date: 2014/03/19
# Author: https://github.com/adoc/
# © 2014 Nicholas Long. All Rights Reserved.
import logging
log = logging.getLogger(__name__)
import functools
import collections
import time
import threading
import uuid
import urllib.parse
import redis
import json as _json
from safedict import SafeDict
__all__ = ('JSONEncoder', 'JSONDecoder', 'json', 'dump_dict', 'load_dict', 'RedisPool',
'ThreadLocalRedisPool', 'UnifiedSession', 'UnboundModelException', 'RedisObj',
'Hash', 'Set', 'Collection','TestGeneralFunctions', 'TestRedisPoolClass',
'TestThreadLocalRedisPool', 'TestUnifiedSession', 'TestUnboundModelException',
'TestRedisObject', 'TestHash', 'TestSet', 'TestCollection')
# Let's monkeypatch json.dumps and json.loads to do what we want.
class Dummy:
pass
class JSONEncoder(_json.JSONEncoder):
"""Simple encoder to handle UUID types. Converts to URN (Universal
Resource Name); "urn:uuid:1111-..."
"""
def default(self, obj):
if isinstance(obj, uuid.UUID):
return obj.urn
return _json.JSONEncoder.default(self, obj)
class JSONDecoder(_json.JSONDecoder):
"""Simple decoder to handle UUID types. Instances UUID object when
value starts with "urn:uuid".
"""
def decode(self, obj):
obj = _json.JSONDecoder.decode(self, obj)
# Handle potential UUID.
if isinstance(obj, str) and obj.startswith('urn:uuid'):
return uuid.UUID(obj)
else:
return obj
# Let's just make our own `json` object and overload the relevant funcs.
json = Dummy()
json.dumps = functools.partial(_json.dumps, separators=(',', ':'), cls=JSONEncoder)
json.loads = functools.partial(_json.loads, object_pairs_hook=collections.OrderedDict,
cls=JSONDecoder)
def dump_dict(obj):
"""Dump inner dictionary items. This is a preparation for
insertion in to the string-valued Redis.
"""
return {k: json.dumps(v) for k, v in obj.items()}
def load_dict(obj):
"""Load inner dictionary items. This is after retrieval from
Redis.
"""
return {k.decode(): json.loads(v.decode()) for k, v in obj.items()}
class RedisPool:
"""
A Redis implementation that utilizes a process persistent
ConnectionPool.
"""
def __init__(self, host='localhost', port=6379, db=0, password=None,
max_connections=None, client_cls=redis.StrictRedis,
pool_cls=redis.ConnectionPool, **kwa):
"""Construct a new RedisPool instance.
"""
kwa.update({
'host': host,
'port': port,
'db': db,
'password': password,
'max_connections': max_connections
})
self.__client_cls = client_cls
log.info("%s: Setting up the Redis Connection Pool." %
self.__class__.__name__)
log.debug("%s: Pool connection parameters %s" %
(self.__class__.__name__, kwa))
self.__pool = pool_cls(**kwa)
@classmethod
def from_url(cls, url, db=None, **kwa):
"""Contstruct a new ThreadLocalRedis instance using a url for
the connection parameters. Similar to StrictRedis.from_url().
Can accept a urlparse.ParseResult or a string.
"""
if not isinstance(url, urllib.parse.ParseResult):
url = urllib.parse.urlparse(url)
assert url.scheme == 'redis' or not url.scheme
if db is None:
try:
db = int(url.path.lstrip('/'))
except ValueError:
db = 0
return cls(host=url.hostname, port=url.port, db=db,
password=url.password, **kwa)
def get_client(self):
"""Retrieve a Redis Client with a Connection from the pool."""
return self.__client_cls(connection_pool=self.__pool)
class ThreadLocalRedisPool(RedisPool):
"""
A persistent Redis implementation that provides thread local
clients and pipelines. A single instance of this class is meant to
be shared to multiple threads.
"""
__registry = SafeDict(threading.get_ident)
@property
def client(self):
"""Returns a thread local Redis Client."""
try:
client = self.__registry['client']
except KeyError:
client = self.__registry['client'] = self.get_client()
return client
@property
def pipeline(self):
"""Returns a thread local Redis Pipeline."""
try:
pipeline = self.__registry['pipeline']
except KeyError:
pipeline = self.__registry['pipeline'] = self.client.pipeline()
return pipeline
def remove_pipeline(self):
"""Remove the thread local Redis Pipeline."""
if 'pipeline' in self.__registry:
log.debug("%s: Removing Redis Pipeline for thread: %s." %
(self.__class__.__name__, self.__registry.thread_id))
del self.__registry['pipeline']
def remove_client(self):
"""Remove the thread local Redis Pipeline and Client."""
if 'client' in self.__registry:
log.debug("%s: Removing Redis Client for thread: %s." %
(self.__class__.__name__, self.__registry.thread_id))
self.remove_pipeline()
del self.__registry['client']
remove = remove_client
class UnifiedSession(ThreadLocalRedisPool):
""" """
pipeline_commands = ('set', 'delete', 'hmset', 'zadd', 'zrem')
client_commands = ('info', 'flushall', 'get', 'hgetall', 'zrange', 'zcard',
'zrangebyscore')
def __init__(self, *args, **kwa):
ThreadLocalRedisPool.__init__(self, *args, **kwa)
self._exec_events = set()
def __getattr__(self, attrname):
if attrname in self.pipeline_commands:
return getattr(self.pipeline, attrname)
elif attrname in self.client_commands:
return getattr(self.client, attrname)
else:
raise AttributeError("UnifiedSession has no attribute '%s'." % attrname)
def bind_exec_event(self, callback):
self._exec_events.add(callback)
def unbind_exec_event(self, callback):
self._exec_events.discard(callback)
def trigger_exec_event(self):
for callback in self._exec_events:
callback()
def execute(self):
try:
return self.pipeline.execute()
finally:
self.trigger_exec_event()
self.remove_pipeline()
class UnboundModelException(Exception):
pass
class RedisObj:
keyspace_separator = ':'
def __init__(self, *namespace, session=None):
if isinstance(session, UnifiedSession):
self.__session = session
self.__session.bind_exec_event(self._flushed)
elif session is None:
self.__session = None
else:
raise ValueError("`RedisObj` session must be a `UnifiedSession`.")
self.__namespace = namespace
def _flushed(self):
""" """
pass
@property
def session(self):
# Yes, this is meant to check against None rather than
# isinstance. Notice the exception.
if self.__session is None:
raise UnboundModelException("This model is not bound to any session.")
else:
return self.__session
self.__session.bind_exec_event(self._flushed)
@property
def namespace(self):
""" """
return self.__namespace
def bind(self, session):
if isinstance(session, UnifiedSession):
self.__session = session
else:
raise ValueError("`bind` requires a UnifiedSession object.")
def gen_key(self):
""" """
return self.keyspace_separator.join(self.__namespace)
def __del__(self):
print("RedisObj was deleted!!!!")
class Hash(RedisObj):
"""
>>> import thredis
>>>
>>> s = thredis.UnifiedSession.from_url('redis://127.0.0.1:6379')
>>> h = thredis.Hash('name', 'space', session=s)
>>>
>>> id = h.set({'foo': True, 'bar': 'baz', 'boop': 123})
>>> h.session.execute();
[True]
>>>
>>> h.get(id)
{'boop': 123, 'foo': True, 'bar': 'baz', '_id': UUID('d12a97cc-46f2-460b-96c8-02ff18f55c95')}
"""
def __init__(self, *namespace, random_id_func=uuid.uuid4, **kwa):
"""
"""
if not len(namespace) > 0:
raise ValueError('`Hash` requires at least one namespace argument.')
RedisObj.__init__(self, *namespace, **kwa)
self.__random_id_func = random_id_func
def gen_key(self, _id):
"""
"""
if isinstance(_id, bytes):
_id = _id.decode()
return self.keyspace_separator.join([RedisObj.gen_key(self), str(_id)])
def _get(self, _id):
"""Do the actual get operation.
"""
return self.session.hgetall(Hash.gen_key(self, _id))
def get(self, _id):
"""
"""
return load_dict(Hash._get(self, _id))
def _set(self, _id, obj):
"""Do the actual set operation.
"""
return self.session.hmset(Hash.gen_key(self, _id), obj)
def set(self, obj, **kwa):
"""
"""
if 'id' not in kwa and '_id' not in obj:
kwa['id'] = self.__random_id_func()
elif 'id' not in kwa and '_id' in obj:
kwa['id'] = obj['_id']
if 'active' not in kwa and '_active' not in obj:
kwa['active'] = True
elif 'active' not in kwa and '_active' in obj:
kwa['active'] = obj['_active']
# Append _ to any additional items and extend in to obj.
obj.update({'_%s' % k: v for k, v in kwa.items()})
Hash._set(self, obj['_id'], dump_dict(obj))
return obj['_id']
def delete(self, _id, reference=True):
"""
"""
if reference:
obj = self.get(_id)
Hash.set(self, obj, active=False)
else:
self.session.delete(Hash.gen_key(self, _id))
class Set(RedisObj):
"""
>>> import thredis
>>> s = thredis.UnifiedSession.from_url('redis://127.0.0.1:6379')
>>> se = thredis.Set('set', 'space', session=s)
>>>
>>> se.add('set1')
>>> se.add('set2')
>>> se.add('set3')
>>>
>>> se.session.execute()
[1, 1, 1]
>>>
>>> se.all()
[b'set1', b'set2', b'set3']
>>>
>>> se.insert('set1.5', 1)
>>>
>>> se.session.execute()
[1, 0, 0]
>>>
>>> se.all()
[b'set1', b'set1.5', b'set2', b'set3']
>>>
"""
def __init__(self, *args, **kwa):
RedisObj.__init__(self, *args, **kwa)
self.__dirty_count = 0
def _flushed(self):
""" """
self.__dirty_count = 0
def count(self):
return self.session.zcard(self.gen_key()) + self.__dirty_count
def add(self, obj, score=None):
if not isinstance(score, float):
score = self.count()
self.session.zadd(self.gen_key(), score, obj)
self.__dirty_count += 1
def range(self, from_idx, to_idx):
return self.session.zrange(self.gen_key(), int(from_idx), int(to_idx))
def get(self, idx):
item = self.range(idx, idx)
if len(item) > 0:
return item[0]
def all(self):
return self.range(0, -1)
def delete(self, obj):
self.session.zrem(self.gen_key(), obj)
self.__dirty_count -= 1
def insert(self, obj, at_idx):
"""Insert object `at_index`, reindexing all objects following."""
idx = float(at_idx)
self.session.execute() # Unfortunate hack to keep the indexing correct.
self.add(obj, score=idx)
for item in self.range(idx, -1):
idx += 1
self.add(item, score=idx)
class Collection(RedisObj):
"""Provides a hash and ordered set.
>>> import thredis
>>>
>>> s = thredis.UnifiedSession.from_url('redis://127.0.0.1:6379')
>>> s.flushall()
True
>>> c = thredis.Collection('coll', 'space', session=s)
>>>
>>> id = c.add({'foo': 'bars'})
>>>
>>> c.session.execute()
[True, 1]
>>>
>>> c.get(id)
{'foo': 'bars', '_idx': 0, '_id': UUID('3ff956ae-d36b-416b-8c83-9103a0ebc7ff')}
>>>
>>> c.add({'foo': 'bars'})
>>> c.add({'foo': 'bars'})
>>> c.add({'foo': 'bars'})
>>> c.session.execute()
[True, 1, True, 1, True, 1]
>>> list(c.all(active_only=False))
"""
def __init__(self, *namespace, **kwa):
RedisObj.__init__(self, *namespace, **kwa)
self.__hash = Hash(*namespace+('data',), **kwa)
self.__index = Set(*namespace+('idx',), **kwa)
self.__active_index = Set(*namespace+('active',), **kwa)
def all(self, active_only=True):
"""Get all items in the collection and return as a generator.
"""
if active_only is True:
id_set = self.__active_index.all()
else:
id_set = self.__index.all()
print(id_set)
return (self.__hash.get(_id) for _id in id_set)
def idx(self, _id, verify=True):
obj = self.__hash.get(_id)
if not verify or self.__index.get(obj['_idx']) == obj['_id']:
return obj['_idx']
else:
# TODO: Custom exception for this. A bad one.
raise Exception("Inconsitency in Redis. This is bad and should never happen.")
def move(self):
pass
def get(self, _id):
return self.__hash.get(_id)
def add(self, obj, **kwa):
if 'idx' not in kwa:
kwa['idx'] = self.__index.count()
# Do hash add and then put in to set before item at `idx`.
_id = self.__hash.set(obj, **kwa)
self.__index.insert(_id, kwa['idx'])
return _id
def delete(self):
pass
'''
class Collection(Hash, Set):
"""Provides a hash and ordered set.
"""
def __init__(self, *namespace, **kwa):
Hash.__init__(self, *namespace, **kwa)
Set.__init__(self, *namespace, **kwa)
@property
def keyspace(self):
return self._keyspace
def gen_hash_key(self, id_):
"""Generate hash key."""
return self._hash_key_tmpl.format(keyspace=self.keyspace,
id=id_)
def gen_set_key(self):
"""Generate set key."""
return self._set_key_tmpl.format(keyspace=self.keyspace)
def list(self, active_only=True):
"""Ordered list of elements in the collection. This returns a generator."""
# This is a generator.
collection_ids = self.session.zrange(self.gen_set_key(), 0, -1)
return (self.session.hgetall(self.gen_hash_key(id_)) for id_ in collection_ids)
def idx(self, id, verify=True):
"""Return the index position of the given id."""
obj = self.session.hmget(self.gen_hash_key(id_))
idx_ = obj['_idx']
if not verify or self.session.zrange(self.gen_set_key(), idx_, idx_) == obj['_id']:
return obj['_idx']
else:
raise Exception("Inconsitency in Redis. This is bad and should never happen.")
def move(self, id, to_idx):
pass
def get(self, id_):
""" """
return self._load_dict(self.session.hgetall(self.gen_hash_key(id_)))
def add(self, obj, active=True):
"""Add object to the collection. Returns the id of the new object."""
set_key = self.gen_set_key()
idx_ = self.session.zcard(set_key) # Value of last index + 1
obj = isinstance(obj, dict) and obj or obj.__dict__
id_ = self._random_id_func()
obj['_active'] = active
obj['_id'] = id_
obj['_idx'] = idx_
# Set hash and record position in sorted set.
self.session.hmset(self.gen_hash_key(id_), self._dump_dict(obj))
self.session.zadd(set_key, idx_, id_)
return id_
def delete(self, id_, reference=True):
""" """
set_key = self.gen_set_key()
self.session.zrem(set_key, id_)
obj = self.get(id_)
'''
# TESTS
import uuid
import unittest
import threading
threadfunc = lambda func: threading.Thread(target=func)
class RedisTestCase(unittest.TestCase):
"""Set up the redis connection information here. `url` is used in
most tests, so that should match what's in `host`,`port`,`db`,`password`.
Password in url is accomplished using normal syntax and -anything-
for "user" as it is ignored.
redis://ignored_username:realpassword12345@myredis.local:6380/1
"""
host = "localhost"
port = 6379
db = 0
password = None
url = "redis://127.0.0.1:6379/0"
namespace = "test"
def _session(self):
return UnifiedSession.from_url(self.url)
class TestGeneralFunctions(unittest.TestCase):
"""Test general functions and objects.
(Dummy, JSONEncoder, JSONDecoder, dump_dict, load_dict)
"""
def test_dummy(self):
d = Dummy()
d.foo = 'bar'
d.bar = 'baz'
self.assertIs(d.foo, 'bar')
self.assertIs(d.bar, 'baz')
def test_json_encoder(self):
je = JSONEncoder()
_id = uuid.uuid4()
self.assertEqual(je.default(_id), 'urn:uuid:' + str(_id))
def test_json_decoder(self):
jd = JSONDecoder()
self.assertEqual(jd.decode('"urn:uuid:b6ea2918-e2f1-4c0a-aa50-948edb9120fa"'),
uuid.UUID('b6ea2918-e2f1-4c0a-aa50-948edb9120fa'))
self.assertEqual(jd.decode('"foo"'), 'foo')
self.assertEqual(jd.decode('true'), True)
self.assertEqual(jd.decode('false'), False)
self.assertEqual(jd.decode('123'), 123)
self.assertEqual(jd.decode('123.123'), 123.123)
def test_dump_dict(self):
d = {'foo': 'bar', 'boo': True, 'boop': False, 'baz': 123.123}
expect = {'boo': 'true', 'baz': '123.123', 'foo': '"bar"', 'boop': 'false'}
self.assertEqual(dump_dict(d), expect)
def test_load_dict(self):
l = {b'boo': b'true', b'baz': b'123.123', b'foo': b'"bar"', b'boop': b'false'}
expect = {'foo': 'bar', 'boo': True, 'boop': False, 'baz': 123.123}
self.assertEqual(load_dict(l), expect)
class TestUnboundModelException(unittest.TestCase):
"""Simply test that the exception is an exception. Once it logs or
does other things, we will add more tests.
"""
def test_exception(self):
def do_raise():
raise UnboundModelException()
assert issubclass(UnboundModelException, Exception)
self.assertRaises(UnboundModelException, do_raise)
class TestRedisPoolClass(RedisTestCase):
"""Test RedisPool class"""
def _connect(self):
return RedisPool(host=self.host, port=self.port, db=self.db,
password=self.password)
def test_init(self, pool=None):
pool = pool or self._connect()
self.assertIs(pool._RedisPool__client_cls, redis.StrictRedis)
self.assertIsInstance(pool._RedisPool__pool, redis.ConnectionPool)
def test_get_client(self, pool=None):
pool = pool or self._connect()
client = pool.get_client()
info = client.info()
self.assertEqual(info['tcp_port'], self.port)
def test_from_url(self):
pool = RedisPool.from_url(self.url)
self.test_init(pool)
self.test_get_client(pool)
class TestThreadLocalRedisPool(RedisTestCase):
def _connect(self):
return ThreadLocalRedisPool.from_url(self.url)
def test_init(self, pool=None):
pool = pool or self._connect()
TestRedisPoolClass.test_init(self, pool)
self.assertIsInstance(pool._ThreadLocalRedisPool__registry, SafeDict)
def test_client(self, pool=None):
pool = pool or self._connect()
def check():
ident = threading.get_ident()
self.assertIs(pool.client,
pool._ThreadLocalRedisPool__registry['client'][ident])
threads = (threadfunc(check),
threadfunc(check),
threadfunc(check))
def start(t):
t.start()
t.join()
map(start, threads)
def test_pipeline(self, pool=None):
pool = pool or self._connect()
def check():
ident = threading.get_ident()
self.assertIs(pool.pipeline,
pool._ThreadLocalRedisPool__registry['pipeline'][ident])
threads = (threadfunc(check),
threadfunc(check),
threadfunc(check))
def start(t):
t.start()
t.join()
map(start, threads)
def test_remove_pipeline(self, pool=None):
pool = pool or self._connect()
def check():
ident = threading.get_ident()
self.assertIs(pool.pipeline,
pool._ThreadLocalRedisPool__registry['pipeline'][ident])
pool.remove_pipeline()
self.assertRaises(KeyError, pool._ThreadLocalRedisPool__registry['pipeline'][ident])
threads = (threadfunc(check),
threadfunc(check),
threadfunc(check))
def start(t):
t.start()
t.join()
map(start, threads)
def test_remove_client(self, pool=None):
pool = pool or self._connect()
def check():
ident = threading.get_ident()
self.assertIs(pool.client,
pool._ThreadLocalRedisPool__registry['client'][ident])
pool.remove_pipeline()
self.assertRaises(KeyError, pool._ThreadLocalRedisPool__registry['client'][ident])
threads = (threadfunc(check),
threadfunc(check),
threadfunc(check))
def start(t):
t.start()
t.join()
map(start, threads)
def test_remove(self, pool=None):
pool = pool or self._connect()
self.assertEqual(pool.remove, pool.remove_client)
class TestUnifiedSession(RedisTestCase):
""" """
def test_getattr(self, session=None):
session = session or self._session()
for cmd in session.pipeline_commands:
self.assertEqual(getattr(session.pipeline, cmd), getattr(session, cmd))
for cmd in session.client_commands:
self.assertEqual(getattr(session.client, cmd), getattr(session, cmd))
def test_execute(self, session=None):
session = session or self._session()
# No clue how to test the execute other than to use the
# commands on keys in the `test_namespace`
self.assertTrue(False, 'No assertions for this test.')
class TestRedisObject(RedisTestCase):
""" """
def _redisobj(self, session):
return RedisObj(self.namespace, session=session)
def test_init(self):
r = RedisObj('namespace1', 'namespace2')
self.assertEqual(r._RedisObj__namespace, ('namespace1', 'namespace2'))
self.assertIsNone(r._RedisObj__session)
s = self._session()
r = RedisObj('namespace1', 'namespace2', session=s)
self.assertEqual(r._RedisObj__namespace, ('namespace1', 'namespace2'))
self.assertIs(r._RedisObj__session, s)
def test_session(self):
r = self._redisobj(None)
self.assertRaises(UnboundModelException, lambda: r.session)
s = self._session()
r = self._redisobj(s)
self.assertIs(r.session, s)
def test_namespace(self):
r = RedisObj('namespace!')
self.assertEqual(r.namespace, ('namespace!',))
def test_bind(self):
r = self._redisobj(None)
self.assertRaises(ValueError, lambda: r.bind(None))
s = self._session()
r.bind(s)
self.assertIs(r.session, s)
def test_gen_key(self):
r = RedisObj('ns1', 'ns2', 'ns3')
self.assertEqual(r.gen_key(), 'ns1:ns2:ns3')
class TestHash(RedisTestCase):
# TODO: Make sure to test both bytes and strings.
def _hash(self, session):
return Hash(self.namespace, 'hash', session=session)
def setUp(self):
session = self._session()
session.pipeline.hmset(self.namespace+':hash:1',
{'_id': '1',
'foo': '"bartest!"'})
session.pipeline.hmset(self.namespace+':hash:2',
{'_id': '2',
'foo': '"bartest!"',
'bar': 'true',
'baz': '123.123'})
session.pipeline.execute()
def test_init(self):
h = Hash('hash', 'space')
self.assertEqual(h.namespace, ('hash', 'space'))
self.assertIs(h._Hash__random_id_func, uuid.uuid4)
def test_gen_key(self):
h = Hash('hash', 'space')
self.assertEqual(h.gen_key('id123'), 'hash:space:id123')
def test__get(self):
h = self._hash(self._session())
self.assertEqual(h._get('1'), {b'_id': b'1',
b'foo': b'"bartest!"'})
self.assertEqual(h._get('2'), {b'_id': b'2',
b'foo': b'"bartest!"',
b'bar': b'true',
b'baz': b'123.123'})
def test_get(self):
h = self._hash(self._session())
self.assertEqual(h.get('1'), {'_id': 1,
'foo': 'bartest!'})
self.assertEqual(h.get('2'), {'_id': 2,
'foo': 'bartest!',
'bar': True,
'baz': 123.123})
def test__set(self):
s = self._session()
h = self._hash(s)
h._set('3', {'foo': '"bartest!"',
'bar': 'true',
'baz': '123.123'})
s.pipeline.execute()
self.assertEqual(s.client.hgetall(self.namespace+':hash:3'),
{b'foo': b'"bartest!"',
b'bar': b'true',
b'baz': b'123.123'})
def test_set(self):
s = self._session()
h = self._hash(s)
_id = h.set({'foo': 'bartest!',
'bar': True,
'baz': 123.123})
s.pipeline.execute()
self.assertEqual(s.client.hgetall(self.namespace+':hash:'+str(_id)),
{b'_id': b'"'+_id.urn.encode()+b'"',
b'_active': b'true',
b'foo': b'"bartest!"',
b'bar': b'true',
b'baz': b'123.123'})
def test_delete(self):
s = self._session()
h = self._hash(s)
s.pipeline.hmset(self.namespace+':hash:delete1',
{'_id': '"delete1"',
'_active': 'true',
'foo': '"bartest!"'})
s.pipeline.hmset(self.namespace+':hash:delete2',
{'_id': '"delete2"',
'_active': 'true',
'foo': '"bartest!"',
'bar': 'true',
'baz': '123.123'})
self.assertIsNotNone(s.client.hgetall(self.namespace+':hash:delete1'))
self.assertIsNotNone(s.client.hgetall(self.namespace+':hash:delete2'))
h.delete('delete1')
h.delete('delete2', reference=False)
s.pipeline.execute()
self.assertEqual(s.client.hgetall(self.namespace+':hash:delete1')[b'_active'], b'false')
self.assertEqual(s.client.hgetall(self.namespace+':hash:delete2'), {})
class TestSet(RedisTestCase):
""""""
def _set(self, session, space='1'):
return Set(self.namespace, 'set', space, session=session)
def setUp(self):
session = self._session()
session.client.delete(self.namespace+':set:1')
session.client.delete(self.namespace+':set:2')
session.client.delete(self.namespace+':set:3')
session.client.delete(self.namespace+':set:4')
session.client.delete(self.namespace+':set:5')
session.client.delete(self.namespace+':set:6')
session.client.delete(self.namespace+':set:7')
session.pipeline.zadd(self.namespace+':set:1', 1.0, 'foobar')
session.pipeline.zadd(self.namespace+':set:1', 2.0, 'boobaz')
session.pipeline.zadd(self.namespace+':set:1', 3.0, 'booper')
session.pipeline.execute()
def test_count(self):
s = self._set(self._session())
self.assertIs(s.count(), 3)
def test_add(self):
session = self._session()
s = self._set(session, space='2')
s.add('addtest1')
s.add('addtest2')
s.add('addtest1.5', score=0.5)
session.execute()
self.assertEqual(
session.client.zrange(self.namespace+':set:2', 0, -1),
[b'addtest1', b'addtest1.5', b'addtest2'])
def test_range(self):
session = self._session()
session.pipeline.zadd(self.namespace+':set:3', 1.0, 'rangetest1')
session.pipeline.zadd(self.namespace+':set:3', 2.0, 'rangetest2')
session.pipeline.zadd(self.namespace+':set:3', 3.0, 'rangetest3')
session.pipeline.zadd(self.namespace+':set:3', 4.0, 'rangetest4')
session.pipeline.zadd(self.namespace+':set:3', 5.0, 'rangetest5')
session.pipeline.execute()
s = self._set(session, space='3')
self.assertEqual(s.range(0, -1), [b'rangetest1', b'rangetest2', b'rangetest3',
b'rangetest4', b'rangetest5'])
self.assertEqual(s.range(1,-2), [b'rangetest2', b'rangetest3',
b'rangetest4'])
def test_get(self):
session = self._session()
session.pipeline.zadd(self.namespace+':set:4', 1.0, 'gettest1')
session.pipeline.zadd(self.namespace+':set:4', 2.0, 'gettest2')
session.pipeline.zadd(self.namespace+':set:4', 3.0, 'gettest3')
session.pipeline.zadd(self.namespace+':set:4', 4.0, 'gettest4')
session.pipeline.zadd(self.namespace+':set:4', 5.0, 'gettest5')
session.pipeline.execute()
s = self._set(session, space='4')
self.assertEqual(s.get(0), b'gettest1')
self.assertEqual(s.get(2), b'gettest3')
self.assertEqual(s.get(-1), b'gettest5')
def test_all(self):
session = self._session()
session.pipeline.zadd(self.namespace+':set:5', 1.0, 'alltest1')
session.pipeline.zadd(self.namespace+':set:5', 2.0, 'alltest2')
session.pipeline.zadd(self.namespace+':set:5', 3.0, 'alltest3')
session.pipeline.zadd(self.namespace+':set:5', 4.0, 'alltest4')
session.pipeline.zadd(self.namespace+':set:5', 5.0, 'alltest5')
session.pipeline.execute()
s = self._set(session, space='5')
self.assertEqual(s.all(), [b'alltest1', b'alltest2', b'alltest3', b'alltest4', b'alltest5'])
def test_delete(self):
session = self._session()
session.pipeline.zadd(self.namespace+':set:6', 1.0, 'deletetest1')
session.pipeline.zadd(self.namespace+':set:6', 2.0, 'deletetest2')
session.pipeline.zadd(self.namespace+':set:6', 3.0, 'deletetest3')
session.pipeline.zadd(self.namespace+':set:6', 4.0, 'deletetest4')
session.pipeline.zadd(self.namespace+':set:6', 5.0, 'deletetest5')
session.pipeline.execute()
s = self._set(session, space='6')
s.delete('deletetest4')
s.delete('deletetest2')
s.session.execute()
self.assertEqual(session.zrange(self.namespace+':set:6', 0, -1),
[b'deletetest1', b'deletetest3', b'deletetest5'])
def test_insert(self):
session = self._session()
session.pipeline.zadd(self.namespace+':set:7', 1.0, 'deletetest1')
session.pipeline.zadd(self.namespace+':set:7', 3.0, 'deletetest3')
session.pipeline.zadd(self.namespace+':set:7', 5.0, 'deletetest5')
session.pipeline.execute()
s = self._set(session, space='7')
s.insert('deletetest2', 1)
s.insert('deletetest4', 3)
s.session.execute()
self.assertEqual(session.zrange(self.namespace+':set:7', 0, -1),
[b'deletetest1', b'deletetest2', b'deletetest3', b'deletetest4', b'deletetest5'])
class TestCollection(RedisTestCase):
def _collection(self, session):
return Collection(self.namespace, 'collection', session=session)
def test_init(self):
c = Collection('test', 'collection')
self.assertIsInstance(c._Collection__hash, Hash)
self.assertIsInstance(c._Collection__index, Set)
self.assertIsInstance(c._Collection__active_index, Set)
def test_all(self):
pass
def test_idx(self):
pass
def test_move(self):
pass
def test_get(self):
pass
def test_add(self):
pass
def test_delete(self):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment