Skip to content

Instantly share code, notes, and snippets.

@josiahcarlson
Created October 12, 2014 21:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save josiahcarlson/5cefbb7d54ea3b1818a3 to your computer and use it in GitHub Desktop.
Save josiahcarlson/5cefbb7d54ea3b1818a3 to your computer and use it in GitHub Desktop.
Reliably multiplex receiving results via Redis lists
'''
result_multiplex.py
Copyright 2014 Josiah Carlson
Released as LGPL 2 or 3, your choice.
Problem:
You have a task processor that writes the results of tasks to a specified key
in Redis by performing an RPUSH operation.
You don't want to run dozens of BLPOP threads to gather the results to pass on
to waiting clients.
Solution:
Use one thread to BLPOP on one 'router' key, along with all of the known
destinations for result keys.
When a caller wants to be notified via callback of the receipt of results at a
specific LIST key, they only need to call rmux.register_callback(key, callback).
The listening thread receives notifications of keys to watch on the 'router'
key, then adds that key to the keys to BLPOP from.
Upon receiving a real result from one of the keys, the listening thread will
directly call the callback with the data as its only argument.
'''
import random
import threading
import time
import traceback
import uuid
import redis
NOP = lambda data: None
class ResultMultiplexer(object):
def __init__(self, conn, runid=None, timeout=60):
'''
For identification purposes in Redis, runid could be a hostname:pid.
'''
self.conn = conn
self.router_key = str(runid or uuid.uuid4())
self.timeout = timeout
self.callbacks = {}
self.quit = True
def _process_results(self):
'''
Listens on ROUTER_KEY plus others for messages of one of four types:
1. no message, the request timed out after 1 second
2. an empty message, possibly sent by a request to quit processing
3. a key where a result value will be sent
4. the result of a computation, sent to a key
In cases 1 or 2, just listen again.
In case 3, add that key to the list of keys to listen on, then listen again.
In case 4, pop the callback from CALLBACKS, based on the key returned by
BLPOP, and call the function. Note that the function returned may be our
"NOP" function, which does nothing... which may indicate one of the
following conditions:
1. the callback was cancel()ed
2. if using the hostname:pid variant for RUNID, this process may try to
pick up where a previous process left off, picking up items from
ROUTER_KEY, as well as results, but not having callbacks to call
'''
keys = set([self.router_key])
nop = NOP
while not self.quit:
info = self.conn.blpop(keys, 1)
if not info:
# message type 1
continue
key, result = info
if key == self.router_key:
if result:
# message type 3
keys.add(result)
# the missing else clause is message type 2
continue
# message type 4
keys.discard(key)
try:
self.callbacks.pop(key, nop)(result)
except:
# replace me with better error handling
traceback.print_tb()
def start_process_results_thread(self, daemon=True):
'''
Starts and returns the thread that listens for and processes result
callbacks.
'''
self.quit = False
t = threading.Thread(target=self._process_results)
t.daemon = True
t.start()
return t
def stop_process_results_thread(self, now=False):
'''
Shuts down the result processor. If now is False, will
'''
pipe = self.conn.pipeline(True)
if now:
pipe.lpush(self.router_key, '')
else:
pope.rpush(self.router_key, '')
pipe.expire(self.router_key, self.timeout)
@staticmethod
def send_result(conn, key, result, timeout=60):
'''
When the task has finsihed computing its result, it sends the result to the
key provided, and sets the expiration time of the key to KEY_TIMEOUT.
'''
pipe = conn.pipeline(True)
pipe.lpush(key, result)
pipe.expire(key, timeout)
pipe.execute()
def register_callback(self, key, callback):
'''
Registers the passed callback to the key, then sends the key to listen for
to ROUTER_KEY.
'''
self.callbacks[key] = callback
self.conn.rpush(self.router_key, key)
return key
def cancel(self, key):
'''
Attempts to pop the key from the callback registry. If None is returned,
then the result was already processed. If a function is returned, then you
have successfully prevented the callback from running.
'''
return self.callbacks.pop(key, None)
def test():
conn = redis.Redis(db=15)
conn.flushdb()
mplex = ResultMultiplexer(conn)
mplex.start_process_results_thread()
def printer(x):
def printer(y):
print x, "==", y
return printer
for i in xrange(10):
mplex.register_callback(str(i), printer(i))
x = range(10)
random.shuffle(x)
mplex.cancel(str(x[-1]))
print "canceled:", x[-1]
for v in x:
mplex.send_result(conn, str(v), str(v))
time.sleep(1)
if __name__ == '__main__':
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment