Created
December 7, 2012 15:26
-
-
Save josiahcarlson/4233947 to your computer and use it in GitHub Desktop.
A function to do parallel pipelined mget calls to a group of shards
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Written December 7, 2012 by Josiah Carlson | |
Released into the public domain. | |
Untested. | |
''' | |
from binascii import crc32 | |
from collections import deque | |
from itertools import imap, izip | |
import Queue | |
import threading | |
def mget(conn, keys, results, sid, mget_size, pipe_size): | |
res = deque() | |
pipe = conn.pipeline(False) | |
size = 0 | |
# a simple helper to minimize copy/paste | |
def execute(): | |
for r in pipe.execute(): | |
res.extend(r) | |
# prepare blocks of mget calls | |
for start in xrange(0, len(keys), mget_size): | |
pipe.mget(keys[start:start+mget_size]) | |
size += 1 | |
if size >= pipe_size: | |
# if we've reached our pipeline length, gather results | |
execute() | |
execute() | |
# update the results with our results | |
results[sid] = res | |
def parallel_mget(connections, keys, mget_size=80, pipe_size=1): | |
shard_count = len(connections) | |
# compute shard ids | |
shards = [x % shard_count for x in imap(crc32, keys)] | |
to_fetch = lc * [[]] | |
# place keys in lists for each shard | |
for shard, key in izip(shards, keys): | |
tofetch[shard].append(key) | |
results = lc * [None] | |
threads = [] | |
# start fetch threads | |
for sid in xrange(shard_count): | |
thr = threading.Thread(target=mget, args=( | |
connections[sid], to_fetch[sid], results, | |
sid, mget_size, pipe_size)) | |
threads.append(thr) | |
thr.setDaemon(1) | |
thr.start() | |
# wait for queries to finish | |
for thr in threads: | |
thr.join() | |
if None in results: | |
raise Exception("A thread didn't return results: %s"%( | |
[i for i, r in enumerate(results) if r is None],)) | |
# re-order the results to be the same as what was requested: | |
ret = [] | |
for shard in shards: | |
ret.append(results[shard].popleft()) | |
return ret |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment