Skip to content

Instantly share code, notes, and snippets.

@ronnievdc
Last active January 15, 2022 02:00
Show Gist options
  • Save ronnievdc/dd65a1ba1fef86ba73e37faeaaf06de2 to your computer and use it in GitHub Desktop.
Save ronnievdc/dd65a1ba1fef86ba73e37faeaaf06de2 to your computer and use it in GitHub Desktop.
Django Channels Multiplex
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import json
import logging
from asyncio.futures import Future
from copy import deepcopy
from typing import Optional
from urllib.parse import unquote, urlparse, urlencode, parse_qsl
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.routing import get_default_application
log = logging.getLogger(__name__)
class AsyncMultiplexWebsocketConsumer(AsyncWebsocketConsumer):
ACTION_HANDLER_MAPPING = {
'sub': 'websocket.connect', # Client requests a new Channel (e.g. ws = new Websocket())
'msg': 'websocket.receive', # Client sends a message to the server(e.g. ws.send())
'uns': 'websocket.disconnect', # Client closes the channel (e.g. ws.close())
}
SEND_TYPE_MAPPING = {
'websocket.accept': 'open', # Channel is accepted by the server (e.g. ws.addEventListener('open', cb))
'websocket.send': 'message', # Server sends a message to the client (e.g. ws.addEventListener('message', cb))
'websocket.close': 'close', # Channel is closed by the server (e.g. ws.addEventListener('close', cb))
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.channel_queues = {}
self.channel_futures = {}
async def receive(self, text_data: str = None, bytes_data: bytes = None):
"""
Called with a decoded Multiplex WebSocket frame.
"""
if text_data:
channel_action, channel_name, channel_payload = self.demultiplex_data(text_data)
if channel_action == 'websocket.connect':
await self.channel_websocket_connect(channel_name, channel_payload)
return
elif channel_action == 'websocket.disconnect':
await self.channel_websocket_disconnect(channel_name, channel_payload)
return
elif channel_action == 'websocket.receive':
await self.channel_websocket_receive(channel_name, channel_payload)
return
await super(AsyncMultiplexWebsocketConsumer, self).receive(text_data=text_data, bytes_data=bytes_data)
async def disconnect(self, code):
"""
Called when the Multiplex WebSocket connection is closed.
"""
await super(AsyncMultiplexWebsocketConsumer, self).disconnect(code=code)
for topic in self.channel_queues.keys():
await self.channel_queues[topic].put({
'type': 'websocket.disconnect',
'code': code,
})
async def channel_websocket_connect(self, channel_name: str, channel_payload: str):
"""
Called when a Websocket Channel connection is opened.
"""
if channel_name not in self.channel_queues.keys():
scope = deepcopy(self.scope)
path, query_string = self.get_path_from_payload(channel_payload)
channel_queue, channel_event_loop = await self.start_channel_application(channel_name, {
'type': scope['type'],
'path': path,
'raw_path': path.encode('utf-8'),
'headers': scope['headers'],
'query_string': query_string,
'client': scope['client'],
'server': scope['server'],
'subprotocols': scope['subprotocols'],
})
if not channel_queue or not channel_event_loop:
await self.channel_close(channel_name=channel_name)
return
self.channel_queues[channel_name] = channel_queue
self.channel_futures[channel_name] = channel_event_loop
await channel_queue.put({
'type': 'websocket.connect',
})
else:
log.debug('channel_websocket_connect: channel_name %s is already connected', channel_name)
async def channel_connect(self, channel_name: str):
await self.channel_accept(channel_name)
async def channel_accept(self, channel_name: str):
"""
Accepts an incoming Channel request
"""
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.accept'), channel_name]))
async def channel_websocket_receive(self, channel_name: str, message: str):
"""
Called when a WebSocket Channel frame is received.
"""
if channel_name in self.channel_queues.keys():
await self.channel_queues[channel_name].put({
'type': 'websocket.receive',
'text': message,
})
else:
log.debug('channel_websocket_receive: channel_name %s is not connected', channel_name)
async def channel_send(self, channel_name, text_data=None, bytes_data=None, close=False):
"""
Sends a reply back down the Channel
"""
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.send'), channel_name, text_data]))
async def channel_close(self, channel_name: str, code: Optional[int] = None):
"""
Closes the Channel from the server end
"""
if code is not None and code is not True:
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.close'), channel_name, code]))
else:
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.close'), channel_name]))
async def channel_websocket_disconnect(self, channel_name: str, channel_payload: str):
"""
Called when a WebSocket Channel connection is closed.
"""
if channel_name in self.channel_futures.keys():
try:
code = int(channel_payload)
except ValueError:
code = 1005
await self.channel_queues[channel_name].put({
'type': 'websocket.disconnect',
'code': code,
})
del self.channel_queues[channel_name]
del self.channel_futures[channel_name]
else:
log.debug('channel_websocket_disconnect: channel_name %s is not connected', channel_name)
async def dispatch_downstream(self, channel_name: str, message: dict):
"""
The channel sends a message to the client, add the channel name to it
"""
if message['type'] == 'websocket.accept':
await self.channel_connect(channel_name)
elif message['type'] == 'websocket.close':
await self.channel_close(channel_name)
elif message['type'] == 'websocket.send':
await self.channel_send(channel_name, text_data=message['text'])
async def start_channel_application(self, channel_name: str, scope: dict) -> (Optional[asyncio.Queue], Optional[Future]):
# Gets the running settings.ASGI_APPLICATION.
# Usually an instance of ProtocolTypeRouter
application = get_default_application()
try:
application_instance = application(scope=scope)
except ValueError:
# The scope cannot be handled
# Usually this means a 404
return None, None
channel_queue = asyncio.Queue()
consumer = application_instance(
receive=channel_queue.get,
send=lambda message: self.dispatch_downstream(channel_name=channel_name, message=message),
)
# Create the channel event loop, which handles the queue
channel_event_loop = asyncio.ensure_future(
self.close_channel_on_exception(consumer, channel_name),
loop=asyncio.get_event_loop(),
)
return channel_queue, channel_event_loop
async def close_channel_on_exception(self, consumer, channel_name: str):
log.info('Start consumer on channel %s', channel_name)
try:
result = await consumer
except Exception as e:
log.error('Exception in consumer: %s. Closing channel %s.', e, channel_name, exc_info=True, extra={
'scope': self.scope
})
await self.channel_close(channel_name)
# await self.close(code=1000)
raise e
log.info('End consumer on channel %s', channel_name)
return result
def demultiplex_data(self, text_data: str) -> (str, str, str):
text_parts = text_data.split(',')
channel_action = text_parts.pop(0) if len(text_parts) else None
channel_name = text_parts.pop(0) if len(text_parts) else None
channel_payload = ','.join(text_parts)
return self.ACTION_HANDLER_MAPPING.get(channel_action), channel_name, channel_payload
@staticmethod
def get_path_from_payload(channel_payload: str):
# TODO: Does multiplex_client.js do it wrong?
# multiplex_client.js uses 'escape()' on the path name which is latin-1
path = ''
query_string = b'' # self._raw_query_string | Passed by HTTP protocol
if channel_payload:
full_path = unquote(channel_payload, 'latin-1')
(scheme, netloc, path, params, query, fragment) = urlparse(full_path)
query_string = urlencode(parse_qsl(query)).encode('utf-8')
return path, query_string
class AsyncJsonMultiplexWebsocketConsumer(AsyncMultiplexWebsocketConsumer):
def demultiplex_data(self, text_data: str) -> (str, str, str):
json_data = json.loads(text_data)
return self.ACTION_HANDLER_MAPPING.get(json_data.get('action')), json_data.get('stream'), json_data.get('payload')
async def channel_accept(self, channel_name: str):
"""
Accepts an incoming Channel request
"""
await self.send(text_data=json.dumps({
'action': self.SEND_TYPE_MAPPING.get('websocket.accept'),
'stream': channel_name,
'payload': None,
}))
async def channel_send(self, channel_name, text_data=None, bytes_data=None, close=False):
"""
Sends a reply back down the Channel
"""
await self.send(text_data=json.dumps({
'action': self.SEND_TYPE_MAPPING.get('websocket.send'),
'stream': channel_name,
'payload': json.loads(text_data),
}))
async def channel_close(self, channel_name: str, code: Optional[int] = None):
"""
Closes the Channel from the server end
"""
await self.send(text_data=json.dumps({
'action': self.SEND_TYPE_MAPPING.get('websocket.close'),
'stream': channel_name,
'payload': code,
}))
/**
* Copyright (c) 2017, Marek Majkowski. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Modified by ronnievdc to implement the connecting event and catching more edge cases
*/
var WebSocketMultiplex = (function () {
// ****
var DumbEventTarget = function () {
this._listeners = {};
};
DumbEventTarget.prototype._ensure = function (type) {
if (!(type in this._listeners)) this._listeners[type] = [];
};
DumbEventTarget.prototype.addEventListener = function (type, listener) {
this._ensure(type);
this._listeners[type].push(listener);
};
DumbEventTarget.prototype.emit = function (type) {
this._ensure(type);
var args = Array.prototype.slice.call(arguments, 1);
if (this['on' + type]) this['on' + type].apply(this, args);
for (var i = 0; i < this._listeners[type].length; i++) {
this._listeners[type][i].apply(this, args);
}
};
// ****
var WebSocketMultiplex = function (ws) {
var that = this;
this.ws = ws;
this.channels = {};
this.ws.addEventListener('message', function (e) {
var t = e.data.split(',');
var type = t.shift(), name = t.shift(), payload = t.join();
if (!(name in that.channels)) {
return;
}
var sub = that.channels[name];
switch (type) {
case 'open':
// Channel is accepted by the server
sub.emit('open', {});
break;
case 'close':
// Channel is closed / rejected by the server
delete that.channels[name];
sub.emit('close', {});
break;
case 'message':
// Channel receives a message from the server
sub.emit('message', {data: payload});
break;
}
});
// Main websocket is accepted by the server
this.ws.addEventListener('open', function (e) {
for (var key in that.channels) {
if (that.channels.hasOwnProperty(key)) {
var sub = that.channels[key];
// All channels that are connecting / closed, try to reconnect
if (sub.readyState === WebSocket.CONNECTING || sub.readyState === WebSocket.CLOSED) {
sub.open();
}
}
}
});
// Main Websocket connection to the server is lost
this.ws.addEventListener('close', function (e) {
for (var key in that.channels) {
if (that.channels.hasOwnProperty(key)) {
var sub = that.channels[key];
sub.emit('close', {});
}
}
});
// Main websocket is connecting to the server
this.ws.addEventListener('connecting', function (e) {
for (var key in that.channels) {
if (that.channels.hasOwnProperty(key)) {
var sub = that.channels[key];
sub.emit('connecting', {});
}
}
});
};
WebSocketMultiplex.prototype.channel = function (url) {
const channel_name = Math.random().toString(36).substring(7);
return this.channels[channel_name] = new Channel(this.ws, channel_name, escape(url), this.channels);
};
var Channel = function (ws, channel_name, url, channels) {
DumbEventTarget.call(this);
var that = this;
this.ws = ws;
this.name = channel_name;
this.url = url;
this.channels = channels;
this.readyState = WebSocket.CONNECTING;
// If the main Websocket is open, send the open command to the server
if (ws.readyState === WebSocket.OPEN) {
setTimeout(this.open.bind(this), 0);
}
// Update the readyState on the events that change it
this.addEventListener('open', function (e) {
that.readyState = WebSocket.OPEN;
});
this.addEventListener('close', function (e) {
that.readyState = WebSocket.CLOSED;
});
this.addEventListener('connecting', function (e) {
that.readyState = WebSocket.CONNECTING;
});
};
Channel.prototype = new DumbEventTarget()
// Send an open request to the server for opening the channel
Channel.prototype.open = function () {
this.ws.send(['sub', this.name, this.url].join(','));
};
// Send an message trough the channel
Channel.prototype.send = function (data) {
this.ws.send(['msg', this.name, data].join(','));
};
// Close the channel from the client side
Channel.prototype.close = function (code, reason) {
var that = this;
if (code) {
this.ws.send(['uns', this.name, code].join(','));
} else {
this.ws.send(['uns', this.name].join(','));
}
delete this.channels[this.name];
this.readyState = WebSocket.CLOSING;
setTimeout(function () {
that.emit('close', {});
}, 0);
};
return WebSocketMultiplex;
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment