Skip to content

Instantly share code, notes, and snippets.

@21paradox
Last active August 19, 2016 01:29
Show Gist options
  • Save 21paradox/a228ca66c729d7d8e9baf41c115fbd9d to your computer and use it in GitHub Desktop.
Save 21paradox/a228ca66c729d7d8e9baf41c115fbd9d to your computer and use it in GitHub Desktop.
websocket-stream with reconnect core
var websocket_stream = require('websocket-stream');
var inject = require('reconnect-core');
var PassThrough = require('stream').PassThrough;
var Duplex = require('stream').Duplex;
var reconnect = inject(function (_url) {
return websocket_stream(_url);
});
// create a reconnect stream
var wrapStream = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
cacheStream.push(chunk);
callback();
},
objectMode: true,
});
// wrap -> cache -> websocket-stream
var cacheStream = new PassThrough({
objectMode: true,
highWaterMark: 16384, //16k
// decodeStrings: false,
// readableObjectMode: true,
// writableObjectMode: true
});
cacheStream.on('data', function(data) {
console.log(data.toString(),'pp')
});
var re = reconnect({}, function (ws_stream) {
var ws_on_data = function (dataRaw) {
wrapStream.push(dataRaw);
};
// ws_stream readable -> wrapStream readable
ws_stream.on('data', ws_on_data);
cacheStream.pipe(ws_stream);
ws_stream.once('end', function() {
cacheStream.pause();
ws_stream.unpipe(wrapStream);
cacheStream.unpipe(ws_stream);
});
});
re.on('connect', function (con) {
console.log('connect');
})
.on('reconnect', function (n, delay) {
console.log('try reconnect');
})
.on('error', function (err) {
console.log(err);
});
re.connect('ws://xxx.xx.xx/websocketxx');
module.exports = cacheStream;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment