Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
equivalence between event-oriented and callback-oriented stream API
"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');
http.createServer(function(request, response) {
response.writeHead(200, {
'Content-Type': 'text/plain; charset=utf8',
'Content-Encoding': 'deflate',
});
var source = fs.createReadStream(__dirname + '/wrappers.txt');
var deflate = zlib.createDeflate();
util.pump(source, deflate);
util.pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');
"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');
http.createServer(function(request, response) {
response.writeHead(200, {
'Content-Type': 'text/plain; charset=utf8',
'Content-Encoding': 'deflate',
});
var source = fs.createReadStream(__dirname + '/wrappers.txt');
var deflate = zlib.createDeflate();
source = new wrappers.EventReadWrapper(new wrappers.CallbackReadWrapper(source));
response = new wrappers.EventWriteWrapper(new wrappers.CallbackWriteWrapper(response));
deflate = new wrappers.EventWrapper(new wrappers.CallbackWrapper(deflate));
util.pump(source, deflate);
util.pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');
"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');
http.createServer(function(request, response) {
response.writeHead(200, {
'Content-Type': 'text/plain; charset=utf8',
'Content-Encoding': 'deflate',
});
var source = fs.createReadStream(__dirname + '/wrappers.js');
var deflate = zlib.createDeflate();
source = new wrappers.CallbackReadWrapper(source);
response = new wrappers.CallbackWriteWrapper(response);
deflate = new wrappers.CallbackWrapper(deflate);
pump(source, deflate);
pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');
function pump(inStream, outStream, _) {
var data;
do {
data = inStream.read(_);
outStream.write(data, _);
} while (data);
}
var EventEmitter = require('events').EventEmitter;
exports.CallbackReadWrapper = function(stream) {
var _chunks = [];
var _error;
var _done = false;
stream.on('error', function(err) {
_onData(err);
});
stream.on('data', function(data) {
_onData(null, data);
});
stream.on('end', function() {
_onData(null, null);
});
function memoize(err, chunk) {
if (err) _error = err;
else if (chunk) {
_chunks.push(chunk);
stream.pause();
} else _done = true;
};
var _onData = memoize;
this.read = function(cb) {
if (_chunks.length > 0) {
var chunk = _chunks.splice(0, 1)[0];
if (_chunks.length === 0) {
stream.resume();
}
return cb(null, chunk);
} else if (_done) {
return cb(null, null);
} else if (_error) {
return cb(_error);
} else _onData = function(err, chunk) {
if (!err && !chunk) _done = true;
_onData = memoize;
cb(err, chunk);
};
}
}
exports.CallbackWriteWrapper = function(stream) {
var _error;
var _onDrain;
stream.on('error', function(err) {
if (_onDrain) _onDrain(err);
else _error = err;
});
stream.on('drain', function() {
_onDrain && _onDrain();
});
this.write = function(data, cb) {
if (_error) return cb(_error);
if (data != null) {
if (!stream.write(data)) {
_onDrain = function(err) {
_onDrain = null;
cb(err);
};
} else {
process.nextTick(cb);
}
} else {
stream.end();
}
}
}
exports.CallbackWrapper = function(stream) {
exports.CallbackReadWrapper.call(this, stream);
exports.CallbackWriteWrapper.call(this, stream);
}
exports.EventReadWrapper = function(stream) {
var self = this;
var q = [],
paused;
function doRead(err, data) {
if (err) self.emit('error', err);
else if (data != null) {
if (paused) {
q.push(data);
} else {
self.emit('data', data);
stream.read(doRead);
}
} else {
if (paused) {
q.push(null);
} else {
self.emit('end');
}
}
}
self.pause = function() {
paused = true;
}
self.resume = function() {
var data;
while ((data = q.shift()) !== undefined) {
if (data != null) self.emit('data', data);
else self.emit('end');
}
paused = false;
stream.read(doRead);
}
stream.read(doRead);
}
exports.EventReadWrapper.prototype = new EventEmitter();
exports.EventWriteWrapper = function(stream) {
var self = this;
var chunks = [];
function written(err) {
if (err) self.emit('error', err);
else {
chunks.splice(0, 1);
if (chunks.length === 0) self.emit('drain');
else stream.write(chunks[0], written);
}
}
this.write = function(data) {
chunks.push(data);
if (chunks.length === 1) stream.write(data, written);
return chunks.length === 0;
}
this.end = function(data) {
if (data != null) self.write(data);
self.write(null);
}
}
exports.EventWriteWrapper.prototype = new EventEmitter();
exports.EventWrapper = function(stream) {
exports.EventReadWrapper.call(this, stream);
exports.EventWriteWrapper.call(this, stream);
}
exports.EventWrapper.prototype = new EventEmitter();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.