Skip to content

Instantly share code, notes, and snippets.

@elprup
Created April 5, 2012 06:43
Show Gist options
  • Save elprup/2308510 to your computer and use it in GitHub Desktop.
Save elprup/2308510 to your computer and use it in GitHub Desktop.
Simple async callback using tornado and redis
'''
Simple async callback using tornado and redis
example:
from channel import *
import tornado.ioloop
import functools
def callback(x, prefix=''):
print prefix+str(x)
mycallback = functools.partial(callback, prefix='my:')
theircallback = functools.partial(callback, prefix='their:')
subscribe('papaya', mycallback)
subscribe('papaya', theircallback)
subscribe('mobile', theircallback)
publish('papaya', 'hello, all')
tornado.ioloop.IOLoop.instance().start()
'''
import socket
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
redis_host = '127.0.0.1'
redis_port = 6379
# channel : callback
_callback_dict = {}
def _connect(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
sock.connect((host,port))
return sock
def _get_stream(host, port):
sock = _connect(host, port)
io_loop = IOLoop.instance()
return IOStream(sock, io_loop=io_loop)
_stream_publish = _get_stream(redis_host, redis_port)
def publish(channel, message):
# global _stream_publish
_stream_publish.write("rpush %s %s\r\n" % (channel, message))
class Dispatcher():
def __init__(self, channel):
self.channel = channel
self.callbacks = []
self.stream = _get_stream(redis_host, redis_port)
self.stream.read_until('\r\n', self.dispatch)
self.stream.write('blpop %s 0\r\n' % self.channel)
# self.parser_status = 0
self.parser_args = None
self.parser_argc = 0
self.parser_length = 0
def register(self, callback):
self.callbacks.append(callback)
def dispatch(self, message):
message = message[:-2]
head = message[0]
if head == '*':
self.parser_argc = int(message[1:])
self.parser_args = []
self.stream.read_until('\r\n', self.dispatch)
return
elif head == '$':
self.parser_length = message[1:]
self.stream.read_until('\r\n', self.dispatch)
return
else:
self.parser_args.append(message)
self.parser_argc -= 1
if self.parser_argc != 0:
self.stream.read_until('\r\n', self.dispatch)
return
for f in self.callbacks:
f(self.parser_args)
self.stream.read_until('\r\n', self.dispatch)
self.stream.write('blpop %s 0\r\n' % self.channel)
def subscribe(channel, callback):
# global redis_host, redis_port
if not _callback_dict.has_key(channel):
_callback_dict[channel] = Dispatcher(channel)
_callback_dict[channel].register(callback)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment