Skip to content

Instantly share code, notes, and snippets.

@ploxiln
Last active November 6, 2018 06:31
Show Gist options
  • Save ploxiln/47732ec45db40878d12de083a1828a00 to your computer and use it in GitHub Desktop.
Save ploxiln/47732ec45db40878d12de083a1828a00 to your computer and use it in GitHub Desktop.
nsq_http_consumer

nsq_http_consumer

Provides an http interface to nsq consumers.

NOTE: This has not updated since before nsqio/pynsq#117 "nsq.Reader.disabled() no longer effective" was fixed.

nsqd already provides an http interface to nsq publishers - just POST data to /put?topic=$TOPIC, typically to the nsqd running locally, and nsqd will take care of delivery to consumers including timeouts and retries, and persisting messages that overflow the in-memory queue to disk.

But nsqd provides only its own tcp protocol interface to consumers, for two major reasons:

  • nsq consumers are pushed messages, consumers do not poll
  • nsq consumers should process a message, and then mark it finished

This little server process allows consumers to poll or long-poll with http, and automatically marks messages as finished so consumers don't have to. So this is less efficient and less safe than a real nsq consumer library, but probably OK and more convenient for some use cases.

running it

Requires python 2.7, and the "tornado" and "pynsq" python modules.

Run with the --help argument to see possible arguments. At least --nsqd_tcp_addresses=... or --lookupd_http_addresses=... is required. (They take one or more comma-separated "host:port").

http interface

There is only one endpoint: /get, with the following parameters:

  • topic: nsq topic
  • channel: nsq channel
  • timeout (optional): set default

Example which long-polls for a message:

curl 'http://127.0.0.1:4159/get?topic=test&channel=testc&timeout=20'

The default timeout is 1 second (the default can be changed with a command-line argument). The shortest possible poll can't be too short, perhaps 100ms, due to the push/poll conversion. Currently, the delay has second granularity.

testing with curl

Useful shell commands for testing with curl

Publishing 10 json messages to nsqd:

for X in $(seq 1 10) ; do curl --data "{\"json\": \"message$X\"}" 'http://127.0.0.1:4151/put?topic=test' ; echo ; done

Requesting 4 messages from nsq_http_consumer in parallel:

for X in $(seq 4) ; do (OUTPUT=$(curl -s 'http://dockerdev:4159/get?topic=test&channel=testc' 2>&1) ; echo "$OUTPUT") & done ; sleep 2
#!/usr/bin/env python
import random
import signal
import logging
import functools
from datetime import timedelta
from collections import deque
import nsq
from tornado import options, web, ioloop
# "topic/channel": {"reader": nsq.Reader, "callbacks": deque, "messages": deque}
nsq_readers = {}
# queueing messages, instead of passing them to callbacks directly,
# is a hack until I figure out how to set rdy_cnt=0 fast enough
# a simplified version of nsq.Reader._redistribute_rdy_state() which sends zeroes
def update_max_in_flight(reader_info):
nsq_reader = reader_info['reader']
old_mif = nsq_reader.max_in_flight
new_mif = min(len(reader_info['callbacks']), 1) # max one in flight
if old_mif != new_mif:
nsq_reader.max_in_flight = new_mif
if new_mif == 0:
for conn in nsq_reader.conns.values():
nsq_reader._send_rdy(conn, new_mif)
else:
possible_conns = nsq_reader.conns.values()
conn = possible_conns.pop(random.randrange(len(possible_conns)))
nsq_reader._send_rdy(conn, new_mif)
def nsq_handler(msg, key):
reader_info = nsq_readers[key]
msg.enable_async()
reader_info['messages'].append(msg)
if len(reader_info['callbacks']) == 0:
logging.warning("got message but no callbacks for %s", key)
return
callback = reader_info['callbacks'].popleft()
update_max_in_flight(reader_info)
callback(reader_info)
def get_reader_info(topic, channel):
global nsq_readers
key = topic + '/' + channel
if key not in nsq_readers:
# constructor enforces non-zero max_in_flight
nsq_reader = nsq.Reader(message_handler=functools.partial(nsq_handler, key=key),
nsqd_tcp_addresses=options.options.nsqd_tcp_addresses,
lookupd_http_addresses=options.options.lookupd_http_addresses,
topic=topic,
channel=channel,
max_tries=99,
max_in_flight=1)
nsq_readers[key] = dict(reader=nsq_reader, callbacks=deque(), messages=deque())
return nsq_readers[key]
class QueueGetHandler(web.RequestHandler):
@web.asynchronous
def get(self):
topic = self.get_argument("topic")
channel = self.get_argument("channel")
timeout = int(self.get_argument("timeout", options.options.default_timeout))
self.finished = False
reader_info = get_reader_info(topic, channel)
if len(reader_info['messages']) > 0:
self._finish_get(reader_info)
return
callback_func = self._finish_get
reader_info['callbacks'].append(callback_func)
update_max_in_flight(reader_info)
timeout_func = functools.partial(self._timeout_get, callback_func, reader_info)
ioloop.IOLoop.instance().add_timeout(timedelta(seconds=timeout), timeout_func)
def _finish_get(self, reader_info):
msg = reader_info['messages'].popleft()
self.write(msg.body)
self.set_header('Content-Type', 'application/json')
self.finish()
self.finished = True
msg.finish()
def _timeout_get(self, callback_func, reader_info):
if self.finished:
return
self.write('{}')
self.set_header('Content-Type', 'application/json')
self.finish()
reader_info['callbacks'].remove(callback_func)
update_max_in_flight(reader_info)
def _handle_term_signal(sig_num, frame):
logging.info(
'TERM Signal handler called with signal %r', sig_num
)
ioloop.IOLoop.instance().stop()
if __name__ == "__main__":
options.define("port", default=4159, type=int, help="tcp port to listen on")
options.define("nsqd_tcp_addresses", type=str, multiple=True)
options.define("lookupd_http_addresses", type=str, multiple=True)
options.define("default_timeout", default=1, type=int, help="default http get timeout in seconds")
options.parse_command_line()
if not options.options.nsqd_tcp_addresses and not options.options.lookupd_http_addresses:
logging.critical("must specify nsqd_tcp_addresses or lookupd_http_addresses")
exit(1)
application = web.Application([
(r"/get", QueueGetHandler),
])
application.listen(options.options.port)
logging.info("nsq_http_consumer listening on port %d", options.options.port)
logging.info("using nsqd_tcp_addresses=%r, lookupd_http_addresses=%r",
options.options.nsqd_tcp_addresses, options.options.lookupd_http_addresses)
signal.signal(signal.SIGTERM, _handle_term_signal)
signal.signal(signal.SIGINT, _handle_term_signal)
ioloop.IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment