Last active
January 15, 2022 02:00
-
-
Save ronnievdc/dd65a1ba1fef86ba73e37faeaaf06de2 to your computer and use it in GitHub Desktop.
Django Channels Multiplex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import asyncio | |
import json | |
import logging | |
from asyncio.futures import Future | |
from copy import deepcopy | |
from typing import Optional | |
from urllib.parse import unquote, urlparse, urlencode, parse_qsl | |
from channels.generic.websocket import AsyncWebsocketConsumer | |
from channels.routing import get_default_application | |
log = logging.getLogger(__name__) | |
class AsyncMultiplexWebsocketConsumer(AsyncWebsocketConsumer): | |
ACTION_HANDLER_MAPPING = { | |
'sub': 'websocket.connect', # Client requests a new Channel (e.g. ws = new Websocket()) | |
'msg': 'websocket.receive', # Client sends a message to the server(e.g. ws.send()) | |
'uns': 'websocket.disconnect', # Client closes the channel (e.g. ws.close()) | |
} | |
SEND_TYPE_MAPPING = { | |
'websocket.accept': 'open', # Channel is accepted by the server (e.g. ws.addEventListener('open', cb)) | |
'websocket.send': 'message', # Server sends a message to the client (e.g. ws.addEventListener('message', cb)) | |
'websocket.close': 'close', # Channel is closed by the server (e.g. ws.addEventListener('close', cb)) | |
} | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.channel_queues = {} | |
self.channel_futures = {} | |
async def receive(self, text_data: str = None, bytes_data: bytes = None): | |
""" | |
Called with a decoded Multiplex WebSocket frame. | |
""" | |
if text_data: | |
channel_action, channel_name, channel_payload = self.demultiplex_data(text_data) | |
if channel_action == 'websocket.connect': | |
await self.channel_websocket_connect(channel_name, channel_payload) | |
return | |
elif channel_action == 'websocket.disconnect': | |
await self.channel_websocket_disconnect(channel_name, channel_payload) | |
return | |
elif channel_action == 'websocket.receive': | |
await self.channel_websocket_receive(channel_name, channel_payload) | |
return | |
await super(AsyncMultiplexWebsocketConsumer, self).receive(text_data=text_data, bytes_data=bytes_data) | |
async def disconnect(self, code): | |
""" | |
Called when the Multiplex WebSocket connection is closed. | |
""" | |
await super(AsyncMultiplexWebsocketConsumer, self).disconnect(code=code) | |
for topic in self.channel_queues.keys(): | |
await self.channel_queues[topic].put({ | |
'type': 'websocket.disconnect', | |
'code': code, | |
}) | |
async def channel_websocket_connect(self, channel_name: str, channel_payload: str): | |
""" | |
Called when a Websocket Channel connection is opened. | |
""" | |
if channel_name not in self.channel_queues.keys(): | |
scope = deepcopy(self.scope) | |
path, query_string = self.get_path_from_payload(channel_payload) | |
channel_queue, channel_event_loop = await self.start_channel_application(channel_name, { | |
'type': scope['type'], | |
'path': path, | |
'raw_path': path.encode('utf-8'), | |
'headers': scope['headers'], | |
'query_string': query_string, | |
'client': scope['client'], | |
'server': scope['server'], | |
'subprotocols': scope['subprotocols'], | |
}) | |
if not channel_queue or not channel_event_loop: | |
await self.channel_close(channel_name=channel_name) | |
return | |
self.channel_queues[channel_name] = channel_queue | |
self.channel_futures[channel_name] = channel_event_loop | |
await channel_queue.put({ | |
'type': 'websocket.connect', | |
}) | |
else: | |
log.debug('channel_websocket_connect: channel_name %s is already connected', channel_name) | |
async def channel_connect(self, channel_name: str): | |
await self.channel_accept(channel_name) | |
async def channel_accept(self, channel_name: str): | |
""" | |
Accepts an incoming Channel request | |
""" | |
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.accept'), channel_name])) | |
async def channel_websocket_receive(self, channel_name: str, message: str): | |
""" | |
Called when a WebSocket Channel frame is received. | |
""" | |
if channel_name in self.channel_queues.keys(): | |
await self.channel_queues[channel_name].put({ | |
'type': 'websocket.receive', | |
'text': message, | |
}) | |
else: | |
log.debug('channel_websocket_receive: channel_name %s is not connected', channel_name) | |
async def channel_send(self, channel_name, text_data=None, bytes_data=None, close=False): | |
""" | |
Sends a reply back down the Channel | |
""" | |
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.send'), channel_name, text_data])) | |
async def channel_close(self, channel_name: str, code: Optional[int] = None): | |
""" | |
Closes the Channel from the server end | |
""" | |
if code is not None and code is not True: | |
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.close'), channel_name, code])) | |
else: | |
await self.send(text_data=','.join([self.SEND_TYPE_MAPPING.get('websocket.close'), channel_name])) | |
async def channel_websocket_disconnect(self, channel_name: str, channel_payload: str): | |
""" | |
Called when a WebSocket Channel connection is closed. | |
""" | |
if channel_name in self.channel_futures.keys(): | |
try: | |
code = int(channel_payload) | |
except ValueError: | |
code = 1005 | |
await self.channel_queues[channel_name].put({ | |
'type': 'websocket.disconnect', | |
'code': code, | |
}) | |
del self.channel_queues[channel_name] | |
del self.channel_futures[channel_name] | |
else: | |
log.debug('channel_websocket_disconnect: channel_name %s is not connected', channel_name) | |
async def dispatch_downstream(self, channel_name: str, message: dict): | |
""" | |
The channel sends a message to the client, add the channel name to it | |
""" | |
if message['type'] == 'websocket.accept': | |
await self.channel_connect(channel_name) | |
elif message['type'] == 'websocket.close': | |
await self.channel_close(channel_name) | |
elif message['type'] == 'websocket.send': | |
await self.channel_send(channel_name, text_data=message['text']) | |
async def start_channel_application(self, channel_name: str, scope: dict) -> (Optional[asyncio.Queue], Optional[Future]): | |
# Gets the running settings.ASGI_APPLICATION. | |
# Usually an instance of ProtocolTypeRouter | |
application = get_default_application() | |
try: | |
application_instance = application(scope=scope) | |
except ValueError: | |
# The scope cannot be handled | |
# Usually this means a 404 | |
return None, None | |
channel_queue = asyncio.Queue() | |
consumer = application_instance( | |
receive=channel_queue.get, | |
send=lambda message: self.dispatch_downstream(channel_name=channel_name, message=message), | |
) | |
# Create the channel event loop, which handles the queue | |
channel_event_loop = asyncio.ensure_future( | |
self.close_channel_on_exception(consumer, channel_name), | |
loop=asyncio.get_event_loop(), | |
) | |
return channel_queue, channel_event_loop | |
async def close_channel_on_exception(self, consumer, channel_name: str): | |
log.info('Start consumer on channel %s', channel_name) | |
try: | |
result = await consumer | |
except Exception as e: | |
log.error('Exception in consumer: %s. Closing channel %s.', e, channel_name, exc_info=True, extra={ | |
'scope': self.scope | |
}) | |
await self.channel_close(channel_name) | |
# await self.close(code=1000) | |
raise e | |
log.info('End consumer on channel %s', channel_name) | |
return result | |
def demultiplex_data(self, text_data: str) -> (str, str, str): | |
text_parts = text_data.split(',') | |
channel_action = text_parts.pop(0) if len(text_parts) else None | |
channel_name = text_parts.pop(0) if len(text_parts) else None | |
channel_payload = ','.join(text_parts) | |
return self.ACTION_HANDLER_MAPPING.get(channel_action), channel_name, channel_payload | |
@staticmethod | |
def get_path_from_payload(channel_payload: str): | |
# TODO: Does multiplex_client.js do it wrong? | |
# multiplex_client.js uses 'escape()' on the path name which is latin-1 | |
path = '' | |
query_string = b'' # self._raw_query_string | Passed by HTTP protocol | |
if channel_payload: | |
full_path = unquote(channel_payload, 'latin-1') | |
(scheme, netloc, path, params, query, fragment) = urlparse(full_path) | |
query_string = urlencode(parse_qsl(query)).encode('utf-8') | |
return path, query_string | |
class AsyncJsonMultiplexWebsocketConsumer(AsyncMultiplexWebsocketConsumer): | |
def demultiplex_data(self, text_data: str) -> (str, str, str): | |
json_data = json.loads(text_data) | |
return self.ACTION_HANDLER_MAPPING.get(json_data.get('action')), json_data.get('stream'), json_data.get('payload') | |
async def channel_accept(self, channel_name: str): | |
""" | |
Accepts an incoming Channel request | |
""" | |
await self.send(text_data=json.dumps({ | |
'action': self.SEND_TYPE_MAPPING.get('websocket.accept'), | |
'stream': channel_name, | |
'payload': None, | |
})) | |
async def channel_send(self, channel_name, text_data=None, bytes_data=None, close=False): | |
""" | |
Sends a reply back down the Channel | |
""" | |
await self.send(text_data=json.dumps({ | |
'action': self.SEND_TYPE_MAPPING.get('websocket.send'), | |
'stream': channel_name, | |
'payload': json.loads(text_data), | |
})) | |
async def channel_close(self, channel_name: str, code: Optional[int] = None): | |
""" | |
Closes the Channel from the server end | |
""" | |
await self.send(text_data=json.dumps({ | |
'action': self.SEND_TYPE_MAPPING.get('websocket.close'), | |
'stream': channel_name, | |
'payload': code, | |
})) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (c) 2017, Marek Majkowski. All rights reserved. | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. | |
Modified by ronnievdc to implement the connecting event and catching more edge cases | |
*/ | |
var WebSocketMultiplex = (function () { | |
// **** | |
var DumbEventTarget = function () { | |
this._listeners = {}; | |
}; | |
DumbEventTarget.prototype._ensure = function (type) { | |
if (!(type in this._listeners)) this._listeners[type] = []; | |
}; | |
DumbEventTarget.prototype.addEventListener = function (type, listener) { | |
this._ensure(type); | |
this._listeners[type].push(listener); | |
}; | |
DumbEventTarget.prototype.emit = function (type) { | |
this._ensure(type); | |
var args = Array.prototype.slice.call(arguments, 1); | |
if (this['on' + type]) this['on' + type].apply(this, args); | |
for (var i = 0; i < this._listeners[type].length; i++) { | |
this._listeners[type][i].apply(this, args); | |
} | |
}; | |
// **** | |
var WebSocketMultiplex = function (ws) { | |
var that = this; | |
this.ws = ws; | |
this.channels = {}; | |
this.ws.addEventListener('message', function (e) { | |
var t = e.data.split(','); | |
var type = t.shift(), name = t.shift(), payload = t.join(); | |
if (!(name in that.channels)) { | |
return; | |
} | |
var sub = that.channels[name]; | |
switch (type) { | |
case 'open': | |
// Channel is accepted by the server | |
sub.emit('open', {}); | |
break; | |
case 'close': | |
// Channel is closed / rejected by the server | |
delete that.channels[name]; | |
sub.emit('close', {}); | |
break; | |
case 'message': | |
// Channel receives a message from the server | |
sub.emit('message', {data: payload}); | |
break; | |
} | |
}); | |
// Main websocket is accepted by the server | |
this.ws.addEventListener('open', function (e) { | |
for (var key in that.channels) { | |
if (that.channels.hasOwnProperty(key)) { | |
var sub = that.channels[key]; | |
// All channels that are connecting / closed, try to reconnect | |
if (sub.readyState === WebSocket.CONNECTING || sub.readyState === WebSocket.CLOSED) { | |
sub.open(); | |
} | |
} | |
} | |
}); | |
// Main Websocket connection to the server is lost | |
this.ws.addEventListener('close', function (e) { | |
for (var key in that.channels) { | |
if (that.channels.hasOwnProperty(key)) { | |
var sub = that.channels[key]; | |
sub.emit('close', {}); | |
} | |
} | |
}); | |
// Main websocket is connecting to the server | |
this.ws.addEventListener('connecting', function (e) { | |
for (var key in that.channels) { | |
if (that.channels.hasOwnProperty(key)) { | |
var sub = that.channels[key]; | |
sub.emit('connecting', {}); | |
} | |
} | |
}); | |
}; | |
WebSocketMultiplex.prototype.channel = function (url) { | |
const channel_name = Math.random().toString(36).substring(7); | |
return this.channels[channel_name] = new Channel(this.ws, channel_name, escape(url), this.channels); | |
}; | |
var Channel = function (ws, channel_name, url, channels) { | |
DumbEventTarget.call(this); | |
var that = this; | |
this.ws = ws; | |
this.name = channel_name; | |
this.url = url; | |
this.channels = channels; | |
this.readyState = WebSocket.CONNECTING; | |
// If the main Websocket is open, send the open command to the server | |
if (ws.readyState === WebSocket.OPEN) { | |
setTimeout(this.open.bind(this), 0); | |
} | |
// Update the readyState on the events that change it | |
this.addEventListener('open', function (e) { | |
that.readyState = WebSocket.OPEN; | |
}); | |
this.addEventListener('close', function (e) { | |
that.readyState = WebSocket.CLOSED; | |
}); | |
this.addEventListener('connecting', function (e) { | |
that.readyState = WebSocket.CONNECTING; | |
}); | |
}; | |
Channel.prototype = new DumbEventTarget() | |
// Send an open request to the server for opening the channel | |
Channel.prototype.open = function () { | |
this.ws.send(['sub', this.name, this.url].join(',')); | |
}; | |
// Send an message trough the channel | |
Channel.prototype.send = function (data) { | |
this.ws.send(['msg', this.name, data].join(',')); | |
}; | |
// Close the channel from the client side | |
Channel.prototype.close = function (code, reason) { | |
var that = this; | |
if (code) { | |
this.ws.send(['uns', this.name, code].join(',')); | |
} else { | |
this.ws.send(['uns', this.name].join(',')); | |
} | |
delete this.channels[this.name]; | |
this.readyState = WebSocket.CLOSING; | |
setTimeout(function () { | |
that.emit('close', {}); | |
}, 0); | |
}; | |
return WebSocketMultiplex; | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment