Skip to content

Instantly share code, notes, and snippets.

@pabloko
Last active October 9, 2019 23:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pabloko/505b3a0741c7ee19405e5b5eb0d54f0c to your computer and use it in GitHub Desktop.
Save pabloko/505b3a0741c7ee19405e5b5eb0d54f0c to your computer and use it in GitHub Desktop.
(JS) Half duplex XHR socket interface

(JS) Half duplex XHR socket interface

Advice: if you're looking to use this on production, better take a look at https://github.com/eBay/jsonpipe

Keeping the clients updated in real time apps is quite challenging sometimes, websockets or webrtc are great for this task, but usually what the developer want is to send most of the data well timed to the client, then recive a few commands from the client, so having full duplex socket in this scenario is quite wasteful, and may pose a security risk, at allowing the user to continuosly send arbitrary data. Polled ajax request, in the other hand cant be in any way correctly timed, so this function acts as an intermediante point between ajax requests and websockets.

This function needs a request that is kept alive then chunked data is being sent. It can be binary data, strings, or serializable json objects. You will have to place chunked headers Transfer-Encoding: chunked and ping regularly (5s should be fine) sending some data in order to maintain the connection open (routers will drop connections where data not flow as stalled or timed out).

Note that using ondata/onstring could result in merged data from different packets sent consecutively. onjson incorporates some basic pattern matching to split them and call its callback for each json item recieved. You may use some delimiter if you need to accurately identify different chunks, for example onjson will find "}{" as delimiter, theres also implemented an h264 NALu video parser for Broadway.js decoding, h264 frames always start with magic header <0,0,0,1> so its delimited accordingly. You can also implement your own protocol extending ondata.

It has different optional callback methods as described below:

var pipe = xhrpipe({
url: "http://url.com/endpoint", //mandatory fetch url
options: {}, //optional fetch options
onstart: function() {}, //optional
onend: function() {}, //optional
onerror: function(err) {}, //optional
ondata: function(uint8arr) {}, //optional
onstring: function(data_string) {}, //optional
onjson: function(json_obj) {}, //optional recomended
onh264: function(h264_data) {} //raw h264 NAL units to feed broadway.js decoder
})
var xhrpipe = function(pdata) {
if (pdata.url==null) return 0;
var vmem = null;
var getVideoNALu = function(element, index, array) { return (index > 0 && array[index+0]==0 && array[index+1]==0 && array[index+2]==0 && array[index+3]==1); }
return fetch(pdata.url,pdata.options!=null?pdata.options:{}).then((response) => {
const reader = response.body.getReader();
const stream = new ReadableStream({
start(controller) {
if (pdata.onstart!=null)pdata.onstart();
function push() {
reader.read().then(({ done, value }) => {
if (done) {
if (pdata.onend!=null)pdata.onend();
controller.close();
return;
} else {
if (pdata.ondata!=null)pdata.ondata(value);
if (pdata.onstring!=null)pdata.onstring(new TextDecoder("utf-8").decode(value));
if (pdata.onjson!=null){
let spl = new TextDecoder("utf-8").decode(value).split('}{')
spl.forEach(function(el,id) {
if (spl.length-1 != id) spl[id] += "}";
if (id>0) spl[id] = "{"+spl[id];
try { pdata.onjson(JSON.parse(spl[id])); } catch(e) {console.error('json parse error',e)}
})
}
if (pdata.onh264!=null) {
if (vmem == null)
vmem = value
else {
vmem = Uint8Array.from([...vmem, ...value])
var ksiz = vmem.findIndex(getVideoNALu);
while (ksiz>-1) {
pdata.onh264(vmem.slice(0, ksiz))
vmem = vmem.slice(ksiz, vmem.length)
ksiz = vmem.findIndex(getVideoNALu);
}
}
}
}
controller.enqueue(value);
push();
});
};
push();
}
});
return new Response(stream, { headers: { "Content-Type": "text/plain" } });
}).catch((a)=>{
if (pdata.onerror!=null)pdata.onerror(a);
});
}
//... express app initialization
app.get('/events', function (req, res) {
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Access-Control-Allow-Origin','*');
var i = 0;
var irval = setInterval(function(){
res.write("test "+(++i));
res.flush()
},1000)
setTimeout(function(){
clearInterval(irval)
res.end();
},10100)
});
var events = require('events');
var express = require('express');
var app = express();
var server = require('http').Server(app);
app.get('/', function (req, res) {
res.sendFile(__dirname + '/index.html');
})
var Emitters = {}
var getEmitter = function(id){
if(!Emitters[id])
Emitters[id] = new events.EventEmitter().setMaxListeners(0);
return Emitters[id];
}
app.get('/events', function (req, res) {
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Access-Control-Allow-Origin','*');
req.Emitter = getEmitter('events')
let contentWriter;
req.Emitter.on('data',contentWriter=function(buffer){
res.write(buffer)
res.flush()
})
res.on('close', function () {
req.Emitter.removeListener('data',contentWriter)
})
});
setInterval(()=>{
getEmitter('events').emit('data', JSON.stringify({action:'ping'}));
},5000)
server.listen(80);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment