Skip to content

Instantly share code, notes, and snippets.

@gdassori
Last active November 23, 2019 08:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gdassori/bf563efe62d553481bc3887518d1054c to your computer and use it in GitHub Desktop.
Save gdassori/bf563efe62d553481bc3887518d1054c to your computer and use it in GitHub Desktop.
Server Websocket Barbino
from aiohttp import web
from core.src.world.builder import websocket_channels_service
from core.src.world.services.websocket_router import sio, loop, app
from etc import settings
if __name__ == '__main__':
websocket_channels_service \
.set_socketio_instance(sio) \
.set_event_loop(loop)
loop.create_task(websocket_channels_service.start())
web.run_app(app, host=settings.SOCKETIO_HOSTNAME, port=settings.SOCKETIO_PORT)
import asyncio
import socketio
from aiohttp import web
import time
from core.src.world.builder import world_repository, websocket_channels_service
from core.src.auth.business.character import exceptions
from core.src.auth.builder import auth_service, redis_characters_index_repository, ws_channels_repository, \
psql_character_repository
from core.src.auth.logging_factory import LOGGER
from core.src.world.components.character import CharacterComponent
from core.src.world.components.created_at import CreatedAtComponent
from core.src.world.components.name import NameComponent
from core.src.world.entity import Entity
from etc import settings
mgr = socketio.AsyncRedisManager('redis://{}:{}'.format(settings.REDIS_HOST, settings.REDIS_PORT))
sio_settings = dict(client_manager=mgr, async_mode='aiohttp')
if settings.ENABLE_CORS:
sio_settings['cors_allowed_origins'] = '*'
loop = asyncio.get_event_loop()
sio = socketio.AsyncServer(**sio_settings)
app = web.Application()
sio.attach(app)
WS_MOTD = """Project M\n"""
@sio.event
async def connect(sid, environ):
LOGGER.core.debug('Sending MOTD')
await sio.emit('msg', {'data': WS_MOTD}, to=sid)
@sio.on('create')
async def create_character(sid, payload):
token = auth_service.decode_session_token(payload['token'])
assert token['context'] == 'world:create'
entity = Entity() \
.set(CharacterComponent(True))\
.set(CreatedAtComponent(int(time.time()))) \
.set(NameComponent(payload["name"]))
entity = world_repository.save_entity(entity)
"""
patchwork starts here
"""
from core.src.auth.database import init_db, db
init_db(db)
character_id = psql_character_repository.store_new_character(
token['data']['user_id'], payload["name"]
).character_id
try:
db.close()
except:
# FIXME - This shouldn't be here, but we miss the "store_new_character" HTTP endpoint yet.
pass
"""
patchwork ends here
"""
redis_characters_index_repository.set_entity_id(character_id, entity.entity_id)
await sio.emit('create', {'success': True, 'character_id': character_id}, to=sid)
@sio.on('auth')
async def authenticate_character(sid, payload):
token = auth_service.decode_session_token(payload['token'])
assert token['context'] == 'world:auth'
entity_id = redis_characters_index_repository.get_entity_id(token['data']['character_id'])
if not entity_id:
raise exceptions.CharacterNotAllocated('create first')
channel = ws_channels_repository.create(entity_id)
await websocket_channels_service.enable_channel(channel)
await sio.emit('auth', {'data': {
'channel_id': channel.connection_id,
'character_id': token['data']['character_id']
}}, to=sid)
import asyncio
import time
from core.src.auth.repositories.redis_websocket_channels_repository import WebsocketChannelsRepository
from core.src.world.components.connection import ConnectionComponent
from core.src.world.entity import Entity
from core.src.auth.logging_factory import LOGGER
"""
this agent is intended to monitor the websockets channels statuses with PING / PONG messages.
check PING_INTERVAL & PING_TIMEOUT options
It also emits events to / from established channels
"""
class WebsocketChannelsService:
def __init__(
self,
socketio=None,
channels_repository: WebsocketChannelsRepository=None,
loop=None,
data_repository=None,
redis_queue=None,
ping_interval=10,
ping_timeout=90
):
self.loop = loop
self.connections_statuses = {}
self.socketio = socketio
self.channels_repository = channels_repository
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.data_repository = data_repository
self._on_delete_channel = []
self._on_ping = []
self._on_cmd_observers = []
self._on_new_channel_observers = []
self.channels_cache = {}
self._pending_channels = asyncio.Queue()
self.redis_queues_manager = redis_queue
def set_socketio_instance(self, sio):
self.socketio = sio
return self
def set_event_loop(self, loop):
self.loop = loop
return self
def add_on_channel_delete_event(self, observer):
self._on_delete_channel.append(observer)
def add_on_ping_event(self, observer):
self._on_ping.append(observer)
def add_on_cmd_observer(self, observer):
self._on_cmd_observers.append(observer)
def add_on_new_channel_observer(self, observer):
self._on_new_channel_observers.append(observer)
async def _on_presence_event(self, connection_id: str, message: str):
LOGGER.websocket_monitor.debug(
'Received message [ %s ] from connection_id [ %s ]', message, connection_id
)
if message == 'PONG':
self.connections_statuses[connection_id]['last_pong'] = int(time.time())
if message == 'PING':
if connection_id in self.connections_statuses:
await self.socketio.emit('presence', 'PONG', namespace='/{}'.format(connection_id))
def remove_handlers(self, channel):
self.loop.create_task(
self.socketio.emit('disconnect', '', '/{}'.format(channel.connection_id))
)
self.socketio.handlers.pop('/{}'.format(channel.connection_id), None)
async def subscribe_pong_from_channels(self, connection_id: str):
LOGGER.websocket_monitor.info('Subscribe presence for channel %s', connection_id)
async def cb(_, data):
await self._on_presence_event(connection_id, data)
self.socketio.on('presence', cb, namespace='/{}'.format(connection_id))
async def subscribe_commands_from_channels(self, channel):
LOGGER.websocket_monitor.info('Subscribe commands for channel %s', channel.connection_id)
async def cb(_, data):
await self.redis_queues_manager.put(
{
'n': channel.connection_id,
'e_id': channel.entity_id,
'd': data,
't': int(time.time()),
'c': 'cmd'
}
)
for observer in self._on_cmd_observers:
self.loop.create_task(
observer.on_message(
channel.connection_id,
self.connections_statuses[channel.connection_id]['entity_id'],
data
)
)
self.socketio.on('cmd', cb, namespace='/{}'.format(channel.connection_id))
async def bind_channel(self, channel):
async def _on_connect(*a, **kw):
LOGGER.websocket_monitor.info('Channel %s connected', str(channel))
self.connections_statuses[channel.connection_id]['open'] = True
self.data_repository.update_entities(
Entity(channel.entity_id).set(ConnectionComponent(channel.connection_id))
)
self.loop.create_task(self.subscribe_commands_from_channels(channel))
await self.redis_queues_manager.put(
{
'n': channel.connection_id,
'e_id': channel.entity_id,
't': int(time.time()),
'c': 'connected'
}
)
self.socketio.on(
'connect', _on_connect, namespace='/{}'.format(channel.connection_id)
)
await self.subscribe_pong_from_channels(channel.connection_id)
async def ping_channel(self, connection_id: str):
await self.socketio.emit('presence', 'PING', namespace='/{}'.format(connection_id))
LOGGER.websocket_monitor.debug('Sending PING message to connection_id [ %s ]', connection_id)
self.connections_statuses[connection_id]['last_ping'] = int(time.time())
for event_handler in self._on_ping:
event_handler(connection_id)
async def start(self):
self.loop.create_task(self.start_monitoring())
while 1:
channel = await self._pending_channels.get()
self.loop.create_task(self._check_connection_status(channel))
async def start_monitoring(self):
while 1:
await self.monitor_connection_statuses()
await asyncio.sleep(5)
async def monitor_connection_statuses(self):
channels = list(self.channels_repository.get_active_channels())
LOGGER.websocket_monitor.debug('Monitoring %s', channels)
for channel in channels:
self.loop.create_task(self._check_connection_status(channel))
async def enable_channel(self, channel):
self._pending_channels.put_nowait(channel)
async def _check_connection_status(self, channel):
if not self.connections_statuses.get(channel.connection_id):
LOGGER.websocket_monitor.debug('Channel %s status never saved. Saving', channel)
self.connections_statuses[channel.connection_id] = {
"seen_at": int(time.time()),
"entity_id": channel.entity_id
}
await self.bind_channel(channel)
self._on_new_channel_observers and self.loop.create_task(
asyncio.gather(
*(observer.on_event(channel) for observer in self._on_new_channel_observers)
)
)
if self.connections_statuses[channel.connection_id].get('open'):
if not self.connections_statuses[channel.connection_id].get('last_ping') or int(time.time()) - \
self.connections_statuses[channel.connection_id]['last_ping'] > self.ping_interval:
await self.ping_channel(channel.connection_id)
for observer in self._on_ping:
observer(channel.connection_id)
elif self.connections_statuses[channel.connection_id].get('last_pong') \
and int(time.time()) - self.connections_statuses[channel.connection_id]['last_pong'] > \
self.ping_timeout:
self._remove_channel(channel)
elif not self.connections_statuses[channel.connection_id].get('last_pong') \
and int(time.time()) - self.connections_statuses[channel.connection_id]['seen_at'] > self.ping_timeout:
self._remove_channel(channel)
def _remove_channel(self, channel):
LOGGER.websocket_monitor.info('Ping timeout for channel %s', channel)
for observer in self._on_delete_channel:
observer(channel.connection_id)
self.channels_repository.delete(channel.connection_id)
self.connections_statuses.pop(channel.connection_id, None)
self.remove_handlers(channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment