Skip to content

Instantly share code, notes, and snippets.

@ronnix
Created September 12, 2012 14:46
Show Gist options
  • Save ronnix/3707121 to your computer and use it in GitHub Desktop.
Save ronnix/3707121 to your computer and use it in GitHub Desktop.
Tornado + Redis listener thread example code
from collections import defaultdict
from threading import Thread
import redis
import tornado.web
class SomeRequestHandler(tornado.web.RequestHandler):
def initialize(self):
self.connection_pool = redis.ConnectionPool(host='127.0.0.1', port=6379)
self.redis_client = redis.Redis(connection_pool=self.connection_pool)
def get(self):
# Tell the listener to listen to some additional channels
client_id = '...something...'
channels = ['foo', 'bar']
self.redis_client.publish('commands', 'subscribe %s %s' % (client_id, ' '.join(channels)))
# Do other stuff
# ...
class RedisSubscriber(Thread):
"""
This thread subscribes to Redis channels on behalf of multiple clients
"""
def __init__(self, *args, **kwargs):
Thread.__init__(self, *args, **kwargs)
# Setup the Redis connection
self.connection_pool = redis.ConnectionPool(host='127.0.0.1', port=6379)
self.client = redis.Redis(connection_pool=self.connection_pool)
self.pubsub = self.client.pubsub()
# Who subscribes to what, and vice-versa
self.channel_subscribers = defaultdict(set)
self.client_channels = defaultdict(set)
def run(self):
"""
Listen to events on the Redis channels
"""
# The 'commands' channel is used for internal communication
self.pubsub.subscribe('commands')
for event in self.pubsub.listen():
if event['type'] == 'message':
channel = event['channel']
data = event['data']
# Internal commands
if channel == 'commands':
quit = self.process_command(data)
if quit:
break
# Other messages
else:
self.do_something(channel, message)
def process_command(self, command):
"""
Process a command
"""
if command == 'quit':
return True
command, params = command.split(' ', 1)
if command == 'subscribe':
client, channels = params.split(' ', 1)
channels = channels.split()
self.subscribe(client, channels)
elif command == 'unsubscribe':
client = params
self.unsubscribe(client)
return False
def subscribe(self, client, channels):
"""
A client wants to subscribe to some channels
"""
for channel in channels:
# Add the client ID to the subscribers of the channel
self.channel_subscribers[channel].add(client)
# Add the channel to the client subscriptions
self.client_channels[client].add(channel)
# Subscribe to the Redis channel
self.pubsub.subscribe(channel)
def unsubscribe(self, client):
"""
A client wants to unsubscribe from all channels
"""
# Remove the client ID from the subscribers of each channel
for channel in self.client_channels[client]:
self.channel_subscribers[channel].remove(client)
# No more subscribers?
if len(self.channel_subscribers[channel]) == 0:
self.pubsub.unsubscribe(channel)
del self.channel_subscribers[channel]
# Remove client subscriptions
del self.client_channels[client]
def do_something(self, channel, message):
# ...
if __name__ == "__main__":
# Start the Redis subscriber background thread
listener = RedisSubscriber()
listener.start()
# Start the Tornado I/O loop in the main thread
application = tornado.web.Application([
# ...
])
application.listen(8080)
tornado.ioloop.IOLoop.instance().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment