Skip to content

Instantly share code, notes, and snippets.

@matthiask
Last active February 28, 2018 08:14
Show Gist options
  • Save matthiask/c48c7824394f5bfbf5a53fba5294c164 to your computer and use it in GitHub Desktop.
Save matthiask/c48c7824394f5bfbf5a53fba5294c164 to your computer and use it in GitHub Desktop.
Channels 2 HTTP experiments (long polling / server sent events)
import asyncio
from datetime import datetime
from channels.consumer import AsyncConsumer
# from channels.http import AsgiRequest
class AsyncHttpConsumer(AsyncConsumer):
async def __call__(self, receive, send):
self.send = send
body = []
while True:
message = await receive()
if message['type'] == 'http.disconnect':
return
else:
if 'body' in message:
body.append(message['body'])
if not message.get('more_body'):
# req = AsgiRequest(self.scope, b''.join(body))
await self.handle(b''.join(body))
return
async def send_headers(self, *, status=200, headers=[]):
await self.send({
'type': 'http.response.start',
'status': status,
'headers': headers,
})
async def send_body(self, body, *, more_body=False):
await self.send({
'type': 'http.response.body',
'body': body.encode('utf-8'),
'more_body': more_body,
})
async def send_response(self, status, body, **kwargs):
await self.send_headers(status=status, **kwargs)
await self.send_body(body)
class LongPollConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[('content-type', 'text/plain')])
for i in range(3):
await self.send_body(
'\n%s\n' % datetime.now(),
more_body=True,
)
await asyncio.sleep(1)
await self.send_body('')
class ServerSentEventsConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[
('cache-control', 'no-cache'),
('transfer-encoding', 'chunked'),
('content-type', 'text/event-stream'),
])
while True:
await self.send_body(
'data: %s\n\n' % datetime.now(),
more_body=True,
)
await asyncio.sleep(1)
<body>
<ul>
<li>initial</li>
</ul>
<script>
/* Polyfill from https://raw.githubusercontent.com/remy/polyfills/master/EventSource.js */
;(function (global) {
if ("EventSource" in global) return;
var reTrim = /^(\s|\u00A0)+|(\s|\u00A0)+$/g;
var EventSource = function (url) {
var eventsource = this,
interval = 500, // polling interval
lastEventId = null,
cache = '';
if (!url || typeof url != 'string') {
throw new SyntaxError('Not enough arguments');
}
this.URL = url;
this.readyState = this.CONNECTING;
this._pollTimer = null;
this._xhr = null;
function pollAgain(interval) {
eventsource._pollTimer = setTimeout(function () {
poll.call(eventsource);
}, interval);
}
function poll() {
try { // force hiding of the error message... insane?
if (eventsource.readyState == eventsource.CLOSED) return;
// NOTE: IE7 and upwards support
var xhr = new XMLHttpRequest();
xhr.open('GET', eventsource.URL, true);
xhr.setRequestHeader('Accept', 'text/event-stream');
xhr.setRequestHeader('Cache-Control', 'no-cache');
// we must make use of this on the server side if we're working with Android - because they don't trigger
// readychange until the server connection is closed
xhr.setRequestHeader('X-Requested-With', 'XMLHttpRequest');
if (lastEventId != null) xhr.setRequestHeader('Last-Event-ID', lastEventId);
cache = '';
xhr.timeout = 50000;
xhr.onreadystatechange = function () {
if (this.readyState == 3 || (this.readyState == 4 && this.status == 200)) {
// on success
if (eventsource.readyState == eventsource.CONNECTING) {
eventsource.readyState = eventsource.OPEN;
eventsource.dispatchEvent('open', { type: 'open' });
}
var responseText = '';
try {
responseText = this.responseText || '';
} catch (e) {}
// process this.responseText
var parts = responseText.substr(cache.length).split("\n"),
eventType = 'message',
data = [],
i = 0,
line = '';
cache = responseText;
// TODO handle 'event' (for buffer name), retry
for (; i < parts.length; i++) {
line = parts[i].replace(reTrim, '');
if (line.indexOf('event') == 0) {
eventType = line.replace(/event:?\s*/, '');
} else if (line.indexOf('retry') == 0) {
retry = parseInt(line.replace(/retry:?\s*/, ''));
if(!isNaN(retry)) { interval = retry; }
} else if (line.indexOf('data') == 0) {
data.push(line.replace(/data:?\s*/, ''));
} else if (line.indexOf('id:') == 0) {
lastEventId = line.replace(/id:?\s*/, '');
} else if (line.indexOf('id') == 0) { // this resets the id
lastEventId = null;
} else if (line == '') {
if (data.length) {
var event = new MessageEvent(data.join('\n'), eventsource.url, lastEventId);
eventsource.dispatchEvent(eventType, event);
data = [];
eventType = 'message';
}
}
}
if (this.readyState == 4) pollAgain(interval);
// don't need to poll again, because we're long-loading
} else if (eventsource.readyState !== eventsource.CLOSED) {
if (this.readyState == 4) { // and some other status
// dispatch error
eventsource.readyState = eventsource.CONNECTING;
eventsource.dispatchEvent('error', { type: 'error' });
pollAgain(interval);
} else if (this.readyState == 0) { // likely aborted
pollAgain(interval);
} else {
}
}
};
xhr.send();
setTimeout(function () {
if (true || xhr.readyState == 3) xhr.abort();
}, xhr.timeout);
eventsource._xhr = xhr;
} catch (e) { // in an attempt to silence the errors
eventsource.dispatchEvent('error', { type: 'error', data: e.message }); // ???
}
};
poll(); // init now
};
EventSource.prototype = {
close: function () {
// closes the connection - disabling the polling
this.readyState = this.CLOSED;
clearInterval(this._pollTimer);
this._xhr.abort();
},
CONNECTING: 0,
OPEN: 1,
CLOSED: 2,
dispatchEvent: function (type, event) {
var handlers = this['_' + type + 'Handlers'];
if (handlers) {
for (var i = 0; i < handlers.length; i++) {
handlers[i].call(this, event);
}
}
if (this['on' + type]) {
this['on' + type].call(this, event);
}
},
addEventListener: function (type, handler) {
if (!this['_' + type + 'Handlers']) {
this['_' + type + 'Handlers'] = [];
}
this['_' + type + 'Handlers'].push(handler);
},
removeEventListener: function (type, handler) {
var handlers = this['_' + type + 'Handlers'];
if (!handlers) {
return;
}
for (var i = handlers.length - 1; i >= 0; --i) {
if (handlers[i] === handler) {
handlers.splice(i, 1);
break;
}
}
},
onerror: null,
onmessage: null,
onopen: null,
readyState: 0,
URL: ''
};
var MessageEvent = function (data, origin, lastEventId) {
this.data = data;
this.origin = origin;
this.lastEventId = lastEventId || '';
};
MessageEvent.prototype = {
data: null,
type: 'message',
lastEventId: '',
origin: ''
};
if ('module' in global) module.exports = EventSource;
global.EventSource = EventSource;
})(this);
</script>
<script>
var evtSource = new EventSource('/server-sent-events');
var ul = document.querySelector('ul');
evtSource.onmessage = function(e) {
console.log(e);
var el = document.createElement('li');
el.innerHTML = e.data;
ul.appendChild(el);
};
</script>
</body>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment