Skip to content

Instantly share code, notes, and snippets.

@woozyking
Last active December 14, 2015 21:49
Show Gist options
  • Save woozyking/07db9ec24fd31ea6e65c to your computer and use it in GitHub Desktop.
Save woozyking/07db9ec24fd31ea6e65c to your computer and use it in GitHub Desktop.
Nydus powered Redis Cluster *zunion* and *zinter* solution
import redis
from nydus.db import create_cluster
import time
import random
def cluster_zrange(cluster, keys):
with cluster.map() as conn:
result = [conn.zrange(k, 0, -1, withscores=True) for k in keys]
return result
def cluster_zunion(result, op='sum', reverse=True):
ret = {}
op = op.lower()
for i in result:
d = dict(i)
for k in d:
if op == 'sum':
ret[k] = ret.get(k, 0) + d[k]
elif op == 'min':
ret[k] = d[k] if d[k] < ret.get(k, float('inf')) else ret.get(k, 0)
elif op == 'max':
ret[k] = d[k] if d[k] > ret.get(k, 0) else ret.get(k, 0)
return sorted(map(lambda x: (x, ret[x]), ret), key=lambda x: x[1], reverse=reverse)
def cluster_zinter(r_result, op='sum', reverse=True):
ret = {}
op = op.lower()
dict_l = []
keys = set(dict(r_result[0]).keys())
for r in r_result:
d = dict(r)
dict_l.append(d)
keys = keys & set(d.keys())
for d in dict_l:
for k in keys:
if op == 'sum':
ret[k] = ret.get(k, 0) + d.get(k, 0)
elif op == 'min':
ret[k] = d.get(k, 0) if d.get(k, 0) < ret.get(k, float('inf')) else ret.get(k, 0)
elif op == 'max':
ret[k] = d.get(k, 0) if d.get(k, 0) > ret.get(k, 0) else ret.get(k, 0)
return sorted(map(lambda x: (x, ret[x]), ret), key=lambda x: x[1], reverse=reverse)
def test():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
a = redis.Redis(connection_pool=pool)
b = create_cluster({
'backend': 'nydus.db.backends.redis.Redis',
'router': 'nydus.db.routers.keyvalue.PartitionRouter',
'hosts': {
0: {'port': 63790, 'db': 0},
1: {'port': 63790, 'db': 1},
2: {'port': 63791, 'db': 0},
3: {'port': 63791, 'db': 1},
}
})
# Test Union
a.flushdb()
b.flushdb()
for j in xrange(1000):
if random.choice((True, False)):
a.zadd("test1", "user" + str(j), j + 7 % (j + 1))
b.zadd("test1", "user" + str(j), j + 7 % (j + 1))
if random.choice((True, False)):
a.zadd("test2", "user" + str(j), j - 5 % (j + 1))
b.zadd("test2", "user" + str(j), j - 5 % (j + 1))
if random.choice((True, False)):
a.zadd("test3", "user" + str(j), j + 9 % (j + 1))
b.zadd("test3", "user" + str(j), j + 9 % (j + 1))
if random.choice((True, False)):
a.zadd("test4", "user" + str(j), j - 3 % (j + 1))
b.zadd("test4", "user" + str(j), j - 3 % (j + 1))
start = time.time()
r_result = cluster_zrange(b, ('test1', 'test2', 'test3', 'test4'))
t_sum = cluster_zunion(r_result)
t_min = cluster_zunion(r_result, 'min')
t_max = cluster_zunion(r_result, 'max')
print "Cluster zrevrange with aggregation", time.time() - start, "seconds"
start = time.time()
a.zunionstore('test_sum', ('test1', 'test2', 'test3', 'test4'), 'SUM')
a.zunionstore('test_min', ('test1', 'test2', 'test3', 'test4'), 'MIN')
a.zunionstore('test_max', ('test1', 'test2', 'test3', 'test4'), 'MAX')
m_sum = a.zrevrange('test_sum', 0, -1, withscores=True)
m_min = a.zrevrange('test_min', 0, -1, withscores=True)
m_max = a.zrevrange('test_max', 0, -1, withscores=True)
print "Native zrevrange + zinterstore time", time.time() - start, "seconds"
print set(t_sum) == set(m_sum)
print
print set(t_min) == set(m_min)
print
print set(t_max) == set(m_max)
print
# Test Inter
a.flushdb()
b.flushdb()
for j in xrange(1000):
if random.choice((True, False)):
a.zadd("test1", "user" + str(j), j + 7 % (j + 1))
b.zadd("test1", "user" + str(j), j + 7 % (j + 1))
if random.choice((True, False)):
a.zadd("test2", "user" + str(j), j - 5 % (j + 1))
b.zadd("test2", "user" + str(j), j - 5 % (j + 1))
if random.choice((True, False)):
a.zadd("test3", "user" + str(j), j + 9 % (j + 1))
b.zadd("test3", "user" + str(j), j + 9 % (j + 1))
if random.choice((True, False)):
a.zadd("test4", "user" + str(j), j - 3 % (j + 1))
b.zadd("test4", "user" + str(j), j - 3 % (j + 1))
start = time.time()
r_result = cluster_zrange(b, ('test1', 'test2', 'test3', 'test4'))
t_sum = cluster_zinter(r_result)
t_min = cluster_zinter(r_result, 'min')
t_max = cluster_zinter(r_result, 'max')
print "Cluster zrevrange with aggregation", time.time() - start, "seconds"
start = time.time()
a.zinterstore('test_sum', ('test1', 'test2', 'test3', 'test4'), 'SUM')
a.zinterstore('test_min', ('test1', 'test2', 'test3', 'test4'), 'MIN')
a.zinterstore('test_max', ('test1', 'test2', 'test3', 'test4'), 'MAX')
m_sum = a.zrevrange('test_sum', 0, -1, withscores=True)
m_min = a.zrevrange('test_min', 0, -1, withscores=True)
m_max = a.zrevrange('test_max', 0, -1, withscores=True)
print "Native zrevrange + zinterstore time", time.time() - start, "seconds"
print set(t_sum) == set(m_sum)
print
print set(t_min) == set(m_min)
print
print set(t_max) == set(m_max)
print
if __name__ == '__main__':
test()
@woozyking
Copy link
Author

Optimize solutions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment