Skip to content

Instantly share code, notes, and snippets.

@castellanprime
Created December 23, 2019 14:33
Show Gist options
  • Save castellanprime/09bbf217b164bc89aa01a1a84e41c70e to your computer and use it in GitHub Desktop.
Save castellanprime/09bbf217b164bc89aa01a1a84e41c70e to your computer and use it in GitHub Desktop.
Tornado, ZMQ, Websockets
import zmq
import logging
import asyncio
import os
import signal
from functools import wraps
from ast import literal_eval
from threading import Thread
from zmq.eventloop.zmqstream import ZMQStream
from tornado import ioloop, websocket, gen
from serverenums import GameStatus
from tornado.platform.asyncio import AnyThreadEventLoopPolicy
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
class ServerDisconnectedError(Exception):
"""Exception triggered if the server closes the websocket connection arbitrarily"""
pass
class NetClient(Thread):
def __init__(self, port, server_url, **kwargs):
super(NetClient, self).__init__()
self.logger = logging.getLogger(__name__)
self.wsconn = None
self.wsconn_close = False
self.wsconn_already_closed = False
self.has_wsconn_initialised = False
self.is_retrying_connection = False
self.has_started_retry_connection = False
# self.has_sent_wsconn_initialised_msg = False
self.forced_exit = False
self.daemon = True
self.game_url = server_url
self.ctx = zmq.Context.instance()
self.socket = self.ctx.socket(zmq.PAIR)
# do not use localhost, because it would complain
# with error [zmq.error.ZMQError: No such device]
# self.socket.bind('inproc://uisocket')
self.socket.bind('tcp://127.0.0.1:{}'.format(port))
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.communicate_with_server)
self.logger.debug(f'Running on a port: {str(port)}')
self.ioloop = ioloop.IOLoop()
async def read_with_websocket(self):
if self.wsconn_close == True:
if not self.wsconn_already_closed:
self.wsconn.close()
self.socket.send_pyobj(dict(
cmd=GameStatus.ENDED
))
self.wsconn_already_closed = True
return
msg_recv = await self.wsconn.read_message()
self.logger.debug(f'Received from websocket={msg_recv}')
if msg_recv is None:
self.has_wsconn_initialised = False
self.is_retrying_connection = True
return
msg = literal_eval(msg_recv)
self.logger.debug('Received from websocket(Decoded)={}'.format(str(msg)))
return msg
async def send_with_websocket(self, msg):
if msg:
try:
if isinstance(self.wsconn,
websocket.WebSocketClientConnection):
self.logger.debug('Sending game request')
# Cannot send a dict(somehow)
await self.wsconn.write_message(str(msg))
except Exception as err:
self.has_wsconn_initialised = False
self.is_retrying_connection = True
else:
self.wsconn_close = True
async def init_game_conn(self):
self.logger.debug('Creating initial game server connection')
try:
self.wsconn = await websocket.websocket_connect(self.game_url)
except Exception as err:
# print("Connection error: {}".format(err))
raise ServerDisconnectedError('Server was never started. Start server instance.')
else:
self.logger.debug('Connection established')
# if self.wsconn and not self.has_wsconn_initialised:
# self.has_wsconn_initialised = True
# self.socket.send_pyobj(dict(cmd=GameStatus.STARTED, msg='Connection established'))
self.has_wsconn_initialised = True
async def communicate_with_server(self):
try:
while True:
msg_recv = self.socket.recv_pyobj()
self.logger.debug(f'Received from playerui={str(msg_recv)}')
if msg_recv.get('cmd') == GameStatus.ENDED and not self.forced_exit:
self.cleanup()
break
if not self.has_wsconn_initialised:
await self.init_game_conn()
else:
await self.send_with_websocket(msg_recv)
msg_recv = await self.read_with_websocket()
self.socket.send_pyobj(msg_recv)
except ServerDisconnectedError as e:
self.logger.debug('ServerDisconnectedError exception caught')
if not self.forced_exit:
self.cleanup()
os.kill(os.getpid(), signal.SIGINT) # signal is only caught in the main thread
def cleanup_on_panic(self):
self.socket.send_pyobj(dict(cmd=GameStatus.ENDED))
_ = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
self.cleanup()
def cleanup(self):
self.forced_exit = True
self.socket.close()
self.logger.debug('Closing the netclient')
self.ioloop.current().stop()
def run(self):
try:
self.logger.debug(f'Running on ioloop = ${str(self.ioloop)}')
self.ioloop.current().spawn_callback(self.communicate_with_server)
self.ioloop.current().start()
except KeyboardInterrupt:
if not self.forced_exit:
self.cleanup_on_panic()
except Exception:
if not self.forced_exit:
self.cleanup_on_panic()
import zmq
import sys
import logging
import uuid
import os
from tornado import ioloop
from serverenums import GameStatus
from netclient import NetClient
rootlogger = logging.getLogger()
rootlogger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
file_handler = logging.handlers.RotatingFileHandler(f'cmdui_{str(uuid.uuid4())}.log', maxBytes=(1048576*5), backupCount=7)
file_handler.setLevel(logging.DEBUG)
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(f_format)
rootlogger.addHandler(stream_handler)
rootlogger.addHandler(file_handler)
def get_random_port():
import socket
s = socket.socket()
s.bind(("", 0))
port_num = s.getsockname()[1]
s.close()
return port_num
class CmdUI(object):
def __init__(self, port):
self.logger = logging.getLogger(__name__)
self.ctx = zmq.Context().instance()
self.socket = self.ctx.socket(zmq.PAIR)
self.socket.connect('tcp://127.0.0.1:{0}'.format(port))
self.msg_recv = None
self.user_exit = False
def get_str_input(self, question):
while True:
try:
choice = input(question)
if any((choice is None, not choice.strip())):
print('Error: Empty string entered!!!')
self.logger.error('Error: Empty string entered!!!')
else:
return choice
except Exception:
raise
def get_int_input(self, question):
while True:
try:
choice = self.get_str_input(question)
choice = int(choice)
return choice
except ValueError as err:
print(err)
except Exception:
raise
def close_game(self):
self.user_exit = True
self.logger.debug(f'Closing due to application panic')
try:
self.socket.send_pyobj(
dict(cmd=GameStatus.ENDED),
flags=zmq.NOBLOCK
) # using send in nonblocking raised zmq.EAGAIN
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
pass
except Exception:
self.logger.debug(f'{e}: in here')
pass
self.socket.close()
# self.logger.debug('Closing socket')
# self.ctx.term()
self.logger.debug('Closing game')
sys.exit(0)
def start(self):
self.logger.info('CmdUI')
msg_recv = None
try:
while True:
try:
msg_snd = self.get_str_input('Enter a message: ')
self.socket.send_pyobj(dict(cmd=msg_snd))
# Try to receive message
msg_recv = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
except zmq.ZMQError as exc:
if exc.errno == zmq.EAGAIN:
pass
else:
raise
self.logger.info(f'Received from server={str(msg_recv)}')
if msg_recv and msg_recv.get('cmd') == GameStatus.ENDED:
print('Player ended game session')
self.logger.debug('Player ended game session')
self.close_game()
except KeyboardInterrupt:
if not self.user_exit:
self.close_game()
except Exception:
if not self.user_exit:
self.close_game()
def main():
ui, netclient = None, None
try:
port = get_random_port()
netclient = NetClient(port, 'ws://localhost:8888/wshandler')
ui = CmdUI(port)
netclient.start()
ui.start()
except SystemExit:
# netclient.join() # Not neeted since the thread is a daemon
print('Application ended')
ui.logger.debug('Application ended')
os._exit(os.EX_OK)
# raise SystemExit
if __name__ == '__main__':
main()
from tornado import (
web, options, httpserver,
ioloop, websocket, log, escape
)
import logging, sys, asyncio
from serverenums import GameStatus
def enable_server_logging():
options.options['log_file_prefix'] = 'mainserver.log'
options.parse_command_line()
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
root_logger.addHandler(stream_handler)
class WSHandler(websocket.WebSocketHandler):
clients = set()
def check_origin(self, origin):
return True
def open(self):
self.clients.add(self)
print(f'Websocket opened. ClientID = {str(self)}')
def on_message(self, message):
[client.write_message(message) for client in self.clients]
def on_pong(self, data):
# Refer to https://github.com/tornadoweb/tornado/issues/2532, and https://github.com/tornadoweb/tornado/issues/2021
# since the on_message handler is priority, the on_pong handler is made asynchronous
# so that the pong does not block and close connection
# Yes, ensure_future starts executing the coroutine as soon as the event loop is resumed,
# even if no one awaits the returned future
asyncio.ensure_future(self.on_pong_async(data))
async def on_pong_async(self, data):
print(f'Websocket ping from client')
await self.write_message(dict(cmd='GameStatus.HAS_ALREADY_STARTED', msg='Server sent pong'))
def on_close(self):
self.clients.remove(self)
print(f'Closing websocket connection for {str(self)}')
def maks_app():
return web.Application([
(r"/wshandler", WSHandler)
], websocket_ping_interval=10)
if __name__ == "__main__":
enable_server_logging()
try:
app = maks_app()
app.listen(8888)
ioloop.IOLoop.current().start()
except (SystemExit, KeyboardInterrupt):
ioloop.IOLoop.current().stop()
print('Stopped server')
@castellanprime
Copy link
Author

Based on feedback from tornado maintainers. I rewrote server.py to be below:

import logging
import sys
from tornado import (
    web, options, httpserver,
    ioloop, websocket, log, escape, gen
)
from serverenums import GameStatus


def enable_server_logging():
    options.options['log_file_prefix'] = 'mainserver.log'
    options.parse_command_line()

    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)

    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setLevel(logging.INFO)
    stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
    root_logger.addHandler(stream_handler)


class WSHandler(websocket.WebSocketHandler):
    clients = set()
    client_send_errors = 0

    def check_origin(self, origin):
        return True

    def open(self):
        self.clients.add(self)
        print(f'Websocket opened. ClientID = {str(self)}')
        ioloop.IOLoop.current().spawn_callback(self.send_message_to_client)

    def on_message(self, message):
        [client.write_message(message) for client in self.clients]

    async def send_message_to_client(self):
        while True:
            try:
                time_to_await_for = gen.sleep(20)   # 20 seconds
                [client.write_message(dict(cmd='GameStatus.HAS_ALREADY_STARTED', msg='Server sent pong')) for client in self.clients]
                await time_to_await_for
            except websocket.WebSocketClosedError as e:
                print(f'Error: {e}')
                self.close()
                return
            except Exception as e:
                self.client_send_errors += 1
                print(f'Server failed to send response because of {e}')

                if self.client_send_errors > 3:
                    print(f'Closing because max client send errors reached')
                    self.client_send_errors = 0
                    self.close()
                    return

    def on_close(self):
        self.clients.remove(self)
        print(f'Closing websocket connection for {str(self)}')


def maks_app():
    return web.Application([(r"/wshandler", WSHandler)])


if __name__ == "__main__":
    enable_server_logging()
    try:
        app = maks_app()
        app.listen(8888)
        ioloop.IOLoop.current().start()
    except (SystemExit, KeyboardInterrupt):
        ioloop.IOLoop.current().stop()
        print('Stopped server')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment