Created
December 23, 2019 14:33
-
-
Save castellanprime/09bbf217b164bc89aa01a1a84e41c70e to your computer and use it in GitHub Desktop.
Tornado, ZMQ, Websockets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import logging | |
import asyncio | |
import os | |
import signal | |
from functools import wraps | |
from ast import literal_eval | |
from threading import Thread | |
from zmq.eventloop.zmqstream import ZMQStream | |
from tornado import ioloop, websocket, gen | |
from serverenums import GameStatus | |
from tornado.platform.asyncio import AnyThreadEventLoopPolicy | |
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) | |
class ServerDisconnectedError(Exception): | |
"""Exception triggered if the server closes the websocket connection arbitrarily""" | |
pass | |
class NetClient(Thread): | |
def __init__(self, port, server_url, **kwargs): | |
super(NetClient, self).__init__() | |
self.logger = logging.getLogger(__name__) | |
self.wsconn = None | |
self.wsconn_close = False | |
self.wsconn_already_closed = False | |
self.has_wsconn_initialised = False | |
self.is_retrying_connection = False | |
self.has_started_retry_connection = False | |
# self.has_sent_wsconn_initialised_msg = False | |
self.forced_exit = False | |
self.daemon = True | |
self.game_url = server_url | |
self.ctx = zmq.Context.instance() | |
self.socket = self.ctx.socket(zmq.PAIR) | |
# do not use localhost, because it would complain | |
# with error [zmq.error.ZMQError: No such device] | |
# self.socket.bind('inproc://uisocket') | |
self.socket.bind('tcp://127.0.0.1:{}'.format(port)) | |
self.stream = ZMQStream(self.socket) | |
self.stream.on_recv(self.communicate_with_server) | |
self.logger.debug(f'Running on a port: {str(port)}') | |
self.ioloop = ioloop.IOLoop() | |
async def read_with_websocket(self): | |
if self.wsconn_close == True: | |
if not self.wsconn_already_closed: | |
self.wsconn.close() | |
self.socket.send_pyobj(dict( | |
cmd=GameStatus.ENDED | |
)) | |
self.wsconn_already_closed = True | |
return | |
msg_recv = await self.wsconn.read_message() | |
self.logger.debug(f'Received from websocket={msg_recv}') | |
if msg_recv is None: | |
self.has_wsconn_initialised = False | |
self.is_retrying_connection = True | |
return | |
msg = literal_eval(msg_recv) | |
self.logger.debug('Received from websocket(Decoded)={}'.format(str(msg))) | |
return msg | |
async def send_with_websocket(self, msg): | |
if msg: | |
try: | |
if isinstance(self.wsconn, | |
websocket.WebSocketClientConnection): | |
self.logger.debug('Sending game request') | |
# Cannot send a dict(somehow) | |
await self.wsconn.write_message(str(msg)) | |
except Exception as err: | |
self.has_wsconn_initialised = False | |
self.is_retrying_connection = True | |
else: | |
self.wsconn_close = True | |
async def init_game_conn(self): | |
self.logger.debug('Creating initial game server connection') | |
try: | |
self.wsconn = await websocket.websocket_connect(self.game_url) | |
except Exception as err: | |
# print("Connection error: {}".format(err)) | |
raise ServerDisconnectedError('Server was never started. Start server instance.') | |
else: | |
self.logger.debug('Connection established') | |
# if self.wsconn and not self.has_wsconn_initialised: | |
# self.has_wsconn_initialised = True | |
# self.socket.send_pyobj(dict(cmd=GameStatus.STARTED, msg='Connection established')) | |
self.has_wsconn_initialised = True | |
async def communicate_with_server(self): | |
try: | |
while True: | |
msg_recv = self.socket.recv_pyobj() | |
self.logger.debug(f'Received from playerui={str(msg_recv)}') | |
if msg_recv.get('cmd') == GameStatus.ENDED and not self.forced_exit: | |
self.cleanup() | |
break | |
if not self.has_wsconn_initialised: | |
await self.init_game_conn() | |
else: | |
await self.send_with_websocket(msg_recv) | |
msg_recv = await self.read_with_websocket() | |
self.socket.send_pyobj(msg_recv) | |
except ServerDisconnectedError as e: | |
self.logger.debug('ServerDisconnectedError exception caught') | |
if not self.forced_exit: | |
self.cleanup() | |
os.kill(os.getpid(), signal.SIGINT) # signal is only caught in the main thread | |
def cleanup_on_panic(self): | |
self.socket.send_pyobj(dict(cmd=GameStatus.ENDED)) | |
_ = self.socket.recv_pyobj(flags=zmq.NOBLOCK) | |
self.cleanup() | |
def cleanup(self): | |
self.forced_exit = True | |
self.socket.close() | |
self.logger.debug('Closing the netclient') | |
self.ioloop.current().stop() | |
def run(self): | |
try: | |
self.logger.debug(f'Running on ioloop = ${str(self.ioloop)}') | |
self.ioloop.current().spawn_callback(self.communicate_with_server) | |
self.ioloop.current().start() | |
except KeyboardInterrupt: | |
if not self.forced_exit: | |
self.cleanup_on_panic() | |
except Exception: | |
if not self.forced_exit: | |
self.cleanup_on_panic() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
import sys | |
import logging | |
import uuid | |
import os | |
from tornado import ioloop | |
from serverenums import GameStatus | |
from netclient import NetClient | |
rootlogger = logging.getLogger() | |
rootlogger.setLevel(logging.DEBUG) | |
stream_handler = logging.StreamHandler(sys.stdout) | |
stream_handler.setLevel(logging.INFO) | |
stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) | |
file_handler = logging.handlers.RotatingFileHandler(f'cmdui_{str(uuid.uuid4())}.log', maxBytes=(1048576*5), backupCount=7) | |
file_handler.setLevel(logging.DEBUG) | |
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
file_handler.setFormatter(f_format) | |
rootlogger.addHandler(stream_handler) | |
rootlogger.addHandler(file_handler) | |
def get_random_port(): | |
import socket | |
s = socket.socket() | |
s.bind(("", 0)) | |
port_num = s.getsockname()[1] | |
s.close() | |
return port_num | |
class CmdUI(object): | |
def __init__(self, port): | |
self.logger = logging.getLogger(__name__) | |
self.ctx = zmq.Context().instance() | |
self.socket = self.ctx.socket(zmq.PAIR) | |
self.socket.connect('tcp://127.0.0.1:{0}'.format(port)) | |
self.msg_recv = None | |
self.user_exit = False | |
def get_str_input(self, question): | |
while True: | |
try: | |
choice = input(question) | |
if any((choice is None, not choice.strip())): | |
print('Error: Empty string entered!!!') | |
self.logger.error('Error: Empty string entered!!!') | |
else: | |
return choice | |
except Exception: | |
raise | |
def get_int_input(self, question): | |
while True: | |
try: | |
choice = self.get_str_input(question) | |
choice = int(choice) | |
return choice | |
except ValueError as err: | |
print(err) | |
except Exception: | |
raise | |
def close_game(self): | |
self.user_exit = True | |
self.logger.debug(f'Closing due to application panic') | |
try: | |
self.socket.send_pyobj( | |
dict(cmd=GameStatus.ENDED), | |
flags=zmq.NOBLOCK | |
) # using send in nonblocking raised zmq.EAGAIN | |
except zmq.ZMQError as exc: | |
if exc.errno == zmq.EAGAIN: | |
pass | |
except Exception: | |
self.logger.debug(f'{e}: in here') | |
pass | |
self.socket.close() | |
# self.logger.debug('Closing socket') | |
# self.ctx.term() | |
self.logger.debug('Closing game') | |
sys.exit(0) | |
def start(self): | |
self.logger.info('CmdUI') | |
msg_recv = None | |
try: | |
while True: | |
try: | |
msg_snd = self.get_str_input('Enter a message: ') | |
self.socket.send_pyobj(dict(cmd=msg_snd)) | |
# Try to receive message | |
msg_recv = self.socket.recv_pyobj(flags=zmq.NOBLOCK) | |
except zmq.ZMQError as exc: | |
if exc.errno == zmq.EAGAIN: | |
pass | |
else: | |
raise | |
self.logger.info(f'Received from server={str(msg_recv)}') | |
if msg_recv and msg_recv.get('cmd') == GameStatus.ENDED: | |
print('Player ended game session') | |
self.logger.debug('Player ended game session') | |
self.close_game() | |
except KeyboardInterrupt: | |
if not self.user_exit: | |
self.close_game() | |
except Exception: | |
if not self.user_exit: | |
self.close_game() | |
def main(): | |
ui, netclient = None, None | |
try: | |
port = get_random_port() | |
netclient = NetClient(port, 'ws://localhost:8888/wshandler') | |
ui = CmdUI(port) | |
netclient.start() | |
ui.start() | |
except SystemExit: | |
# netclient.join() # Not neeted since the thread is a daemon | |
print('Application ended') | |
ui.logger.debug('Application ended') | |
os._exit(os.EX_OK) | |
# raise SystemExit | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tornado import ( | |
web, options, httpserver, | |
ioloop, websocket, log, escape | |
) | |
import logging, sys, asyncio | |
from serverenums import GameStatus | |
def enable_server_logging(): | |
options.options['log_file_prefix'] = 'mainserver.log' | |
options.parse_command_line() | |
root_logger = logging.getLogger() | |
root_logger.setLevel(logging.DEBUG) | |
stream_handler = logging.StreamHandler(sys.stdout) | |
stream_handler.setLevel(logging.INFO) | |
stream_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) | |
root_logger.addHandler(stream_handler) | |
class WSHandler(websocket.WebSocketHandler): | |
clients = set() | |
def check_origin(self, origin): | |
return True | |
def open(self): | |
self.clients.add(self) | |
print(f'Websocket opened. ClientID = {str(self)}') | |
def on_message(self, message): | |
[client.write_message(message) for client in self.clients] | |
def on_pong(self, data): | |
# Refer to https://github.com/tornadoweb/tornado/issues/2532, and https://github.com/tornadoweb/tornado/issues/2021 | |
# since the on_message handler is priority, the on_pong handler is made asynchronous | |
# so that the pong does not block and close connection | |
# Yes, ensure_future starts executing the coroutine as soon as the event loop is resumed, | |
# even if no one awaits the returned future | |
asyncio.ensure_future(self.on_pong_async(data)) | |
async def on_pong_async(self, data): | |
print(f'Websocket ping from client') | |
await self.write_message(dict(cmd='GameStatus.HAS_ALREADY_STARTED', msg='Server sent pong')) | |
def on_close(self): | |
self.clients.remove(self) | |
print(f'Closing websocket connection for {str(self)}') | |
def maks_app(): | |
return web.Application([ | |
(r"/wshandler", WSHandler) | |
], websocket_ping_interval=10) | |
if __name__ == "__main__": | |
enable_server_logging() | |
try: | |
app = maks_app() | |
app.listen(8888) | |
ioloop.IOLoop.current().start() | |
except (SystemExit, KeyboardInterrupt): | |
ioloop.IOLoop.current().stop() | |
print('Stopped server') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Based on feedback from tornado maintainers. I rewrote server.py to be below: