Skip to content

Instantly share code, notes, and snippets.

@Greyvend
Created May 14, 2017 13:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Greyvend/ed05fd956c396f44d42360081e3b3299 to your computer and use it in GitHub Desktop.
Save Greyvend/ed05fd956c396f44d42360081e3b3299 to your computer and use it in GitHub Desktop.
Tornado TCP Server & Client with Redis connection. Refer to the corresponding repo for the full working example: https://github.com/Databrawl/real_time_tcp/tree/3e01d85e719bf793a4811b2d701609a9a4d36597
from concurrent.futures import ThreadPoolExecutor
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.tcpclient import TCPClient
class Client(TCPClient):
msg_separator = b'\r\n'
def __init__(self):
super(Client, self).__init__()
self._stream = None
self._executor = ThreadPoolExecutor(1)
@gen.coroutine
def run(self, host, port):
self._stream = yield self.connect(host, port)
yield [self.read(), self.write()]
@gen.coroutine
def read(self):
while True:
try:
data = yield self._stream.read_until(self.msg_separator)
body = data.rstrip(self.msg_separator)
print(body)
except StreamClosedError:
self.disconnect()
return
@gen.coroutine
def write(self):
while True:
try:
data = yield self._executor.submit(input)
encoded_data = data.encode('utf8')
encoded_data += self.msg_separator
yield self._stream.write(encoded_data)
except StreamClosedError:
self.disconnect()
return
def disconnect(self):
super(Client, self).close()
self._executor.shutdown(False)
if not self._stream.closed():
print('Disconnecting...')
self._stream.close()
@gen.coroutine
def main():
print('Connecting to the server socket...')
yield Client().run('localhost', 5567)
print('Disconnected from server socket.')
if __name__ == '__main__':
IOLoop.instance().run_sync(main)
import signal
import asyncio
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.tcpserver import TCPServer
from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future
import aioredis
class ClientConnection(object):
message_separator = b'\r\n'
def __init__(self, stream):
self._stream = stream
@gen.coroutine
def run(self):
while True:
try:
request = yield self._stream.read_until(
self.message_separator)
request_body = request.rstrip(self.message_separator)
except StreamClosedError:
self._stream.close(exc_info=True)
return
else:
response_body = request_body
response = response_body + self.message_separator
try:
yield self._stream.write(response)
except StreamClosedError:
self._stream.close(exc_info=True)
return
@gen.coroutine
def update(self, message):
response = message + self.message_separator
try:
yield self._stream.write(response)
except StreamClosedError:
self._stream.close(exc_info=True)
return
class Server(TCPServer):
def __init__(self, *args, **kwargs):
super(Server, self).__init__(*args, **kwargs)
self._redis = None
self._channel = None
self._connections = []
@asyncio.coroutine
def subscribe(self, channel_name):
self._redis = yield aioredis.create_redis(('localhost', 6379))
channels = yield self._redis.subscribe(channel_name)
print('Subscribed to "{}" Redis channel.'.format(channel_name))
self._channel = channels[0]
yield self.listen_redis()
@gen.coroutine
def listen_redis(self):
while True:
yield self._channel.wait_message()
try:
msg = yield self._channel.get(encoding='utf-8')
except aioredis.errors.ChannelClosedError:
print("Redis channel was closed. Stopped listening.")
return
if msg:
body_utf8 = msg.encode('utf-8')
yield [con.update(body_utf8) for con in self._connections]
print("Message in {}: {}".format(self._channel.name, msg))
@gen.coroutine
def handle_stream(self, stream, address):
connection = ClientConnection(stream)
self._connections.append(connection)
yield connection.run()
self._connections.remove(connection)
if __name__ == '__main__':
AsyncIOMainLoop().install()
server = Server()
server.listen(5567)
IOLoop.current().spawn_callback(server.subscribe, 'updates')
print('Starting the server...')
asyncio.get_event_loop().run_forever()
print('Server has shut down.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment