Skip to content

Instantly share code, notes, and snippets.

@jehiah
Created July 15, 2013 02:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jehiah/5997041 to your computer and use it in GitHub Desktop.
Save jehiah/5997041 to your computer and use it in GitHub Desktop.
Buffered NSQ reader (python)
import tornado.ioloop
import logging
import time
class BufferedMessageHandler(object):
"""
Usage:
buffered_handler = BufferedMessageHandler(my_handler)
r = Reader(message_handler=buffered_handler)
buffered_handler.reader = r
run()
This buffers messages and calls the handler with [(message, nsq_msg), (message, nsq_msg), ...]
"""
def __init__(self, message_handler, max_queue_time=25, io_loop=None):
assert callable(message_handler)
self.message_handler = message_handler
self.max_queue_time = max_queue_time
self.queue = []
self.queue_start_time = None
self.io_loop = io_loop or tornado.ioloop.IOLoop.instance()
self.flush_timer = None
def __call__(self, message, nsq_msg):
nsq_msg.enable_async()
# start a timer to flush this batch even if we don't get more messages
if not self.queue:
self.queue_start_time = time.time()
deadline = time.time() + self.max_queue_time
self.flush_timer = self.io_loop.add_timeout(deadline, self.flush_callback)
self.queue.append((message, nsq_msg))
if self.reader.is_starved():
self.flush()
def flush_callback(self):
self.flush_timer = None
self.flush()
def flush(self):
if self.flush_timer:
self.io_loop.remove_timeout(self.flush_timer)
self.flush_timer = None
data = self.queue
if not data:
return
self.queue = []
try:
self.message_handler(data)
except Exception:
logging.exception("uncaught exception proccessing messages. requeueing")
for message, nsq_msg in data:
nsq_msg.requeue()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment