Skip to content

Instantly share code, notes, and snippets.

@Greyvend
Created May 30, 2017 06:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Greyvend/e21b87a0f376b4d15c43fc4b7d5fa31a to your computer and use it in GitHub Desktop.
Save Greyvend/e21b87a0f376b4d15c43fc4b7d5fa31a to your computer and use it in GitHub Desktop.
Tornado TCP Server & Client with Redis connection and simple subscription protocol. Refer to the corresponding repo for the full working example: https://github.com/Databrawl/real_time_tcp
import signal
import asyncio
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.tcpserver import TCPServer
from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future
import aioredis
class ClientConnection(object):
message_separator = b'\r\n'
def __init__(self, stream):
self._stream = stream
self._subscribed = False
def _handle_request(self, request):
if request == 'SUBSCRIBE':
if not self._subscribed:
self._subscribed = True
return 'CONFIRMED'
else:
return 'ALREADY SUBSCRIBED'
elif request == 'UNSUBSCRIBE':
if not self._subscribed:
return 'ALREADY UNSUBSCRIBED'
else:
self._subscribed = False
return 'CONFIRMED'
else:
return 'UNKNOWN COMMAND'
@gen.coroutine
def run(self):
while True:
try:
request = yield self._stream.read_until(
self.message_separator)
request_body = request.rstrip(self.message_separator)
request_body_str = request_body.decode('utf-8')
except StreamClosedError:
self._stream.close(exc_info=True)
return
else:
response_body = self._handle_request(request_body_str)
response_body_bytes = response_body.encode('utf-8')
response = response_body_bytes + self.message_separator
try:
yield self._stream.write(response)
except StreamClosedError:
self._stream.close(exc_info=True)
return
@gen.coroutine
def update(self, message):
if not self._subscribed:
return
response = message + self.message_separator
try:
yield self._stream.write(response)
except StreamClosedError:
self._stream.close(exc_info=True)
return
class Server(TCPServer):
def __init__(self, *args, **kwargs):
super(Server, self).__init__(*args, **kwargs)
self._redis = None
self._channel = None
self._connections = []
@asyncio.coroutine
def subscribe(self, channel_name):
self._redis = yield aioredis.create_redis(('localhost', 6379))
channels = yield self._redis.subscribe(channel_name)
print('Subscribed to "{}" Redis channel.'.format(channel_name))
self._channel = channels[0]
yield self.listen_redis()
@gen.coroutine
def listen_redis(self):
while True:
yield self._channel.wait_message()
try:
msg = yield self._channel.get(encoding='utf-8')
except aioredis.errors.ChannelClosedError:
print("Redis channel was closed. Stopped listening.")
return
if msg:
body_utf8 = msg.encode('utf-8')
yield [con.update(body_utf8) for con in self._connections]
print("Message in {}: {}".format(self._channel.name, msg))
@gen.coroutine
def handle_stream(self, stream, address):
print('New request has come from our {} buddy...'.format(address))
connection = ClientConnection(stream)
self._connections.append(connection)
yield connection.run()
self._connections.remove(connection)
if __name__ == '__main__':
AsyncIOMainLoop().install()
server = Server()
server.listen(5567)
IOLoop.current().spawn_callback(server.subscribe, 'updates')
print('Starting the server...')
asyncio.get_event_loop().run_forever()
print('Server has shut down.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment