Skip to content

Instantly share code, notes, and snippets.

@josiahcarlson
Created December 7, 2012 15:26
Show Gist options
  • Save josiahcarlson/4233947 to your computer and use it in GitHub Desktop.
Save josiahcarlson/4233947 to your computer and use it in GitHub Desktop.
A function to do parallel pipelined mget calls to a group of shards
'''
Written December 7, 2012 by Josiah Carlson
Released into the public domain.
Untested.
'''
from binascii import crc32
from collections import deque
from itertools import imap, izip
import Queue
import threading
def mget(conn, keys, results, sid, mget_size, pipe_size):
res = deque()
pipe = conn.pipeline(False)
size = 0
# a simple helper to minimize copy/paste
def execute():
for r in pipe.execute():
res.extend(r)
# prepare blocks of mget calls
for start in xrange(0, len(keys), mget_size):
pipe.mget(keys[start:start+mget_size])
size += 1
if size >= pipe_size:
# if we've reached our pipeline length, gather results
execute()
execute()
# update the results with our results
results[sid] = res
def parallel_mget(connections, keys, mget_size=80, pipe_size=1):
shard_count = len(connections)
# compute shard ids
shards = [x % shard_count for x in imap(crc32, keys)]
to_fetch = lc * [[]]
# place keys in lists for each shard
for shard, key in izip(shards, keys):
tofetch[shard].append(key)
results = lc * [None]
threads = []
# start fetch threads
for sid in xrange(shard_count):
thr = threading.Thread(target=mget, args=(
connections[sid], to_fetch[sid], results,
sid, mget_size, pipe_size))
threads.append(thr)
thr.setDaemon(1)
thr.start()
# wait for queries to finish
for thr in threads:
thr.join()
if None in results:
raise Exception("A thread didn't return results: %s"%(
[i for i, r in enumerate(results) if r is None],))
# re-order the results to be the same as what was requested:
ret = []
for shard in shards:
ret.append(results[shard].popleft())
return ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment