Skip to content

Instantly share code, notes, and snippets.

@bjouhier
Created July 4, 2012 20:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bjouhier/3049484 to your computer and use it in GitHub Desktop.
Save bjouhier/3049484 to your computer and use it in GitHub Desktop.
equivalence between event-oriented and callback-oriented stream API
"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');
http.createServer(function(request, response) {
response.writeHead(200, {
'Content-Type': 'text/plain; charset=utf8',
'Content-Encoding': 'deflate',
});
var source = fs.createReadStream(__dirname + '/wrappers.txt');
var deflate = zlib.createDeflate();
util.pump(source, deflate);
util.pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');
"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');
http.createServer(function(request, response) {
response.writeHead(200, {
'Content-Type': 'text/plain; charset=utf8',
'Content-Encoding': 'deflate',
});
var source = fs.createReadStream(__dirname + '/wrappers.txt');
var deflate = zlib.createDeflate();
source = new wrappers.EventReadWrapper(new wrappers.CallbackReadWrapper(source));
response = new wrappers.EventWriteWrapper(new wrappers.CallbackWriteWrapper(response));
deflate = new wrappers.EventWrapper(new wrappers.CallbackWrapper(deflate));
util.pump(source, deflate);
util.pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');
"use strict";
var wrappers = require('./wrappers');
var http = require('http');
var zlib = require('zlib');
var util = require('util');
var fs = require('fs');
http.createServer(function(request, response) {
response.writeHead(200, {
'Content-Type': 'text/plain; charset=utf8',
'Content-Encoding': 'deflate',
});
var source = fs.createReadStream(__dirname + '/wrappers.js');
var deflate = zlib.createDeflate();
source = new wrappers.CallbackReadWrapper(source);
response = new wrappers.CallbackWriteWrapper(response);
deflate = new wrappers.CallbackWrapper(deflate);
pump(source, deflate);
pump(deflate, response);
}).listen(1337);
console.log('Server running at http://127.0.0.1:1337/');
function pump(inStream, outStream, _) {
var data;
do {
data = inStream.read(_);
outStream.write(data, _);
} while (data);
}
var EventEmitter = require('events').EventEmitter;
exports.CallbackReadWrapper = function(stream) {
var _chunks = [];
var _error;
var _done = false;
stream.on('error', function(err) {
_onData(err);
});
stream.on('data', function(data) {
_onData(null, data);
});
stream.on('end', function() {
_onData(null, null);
});
function memoize(err, chunk) {
if (err) _error = err;
else if (chunk) {
_chunks.push(chunk);
stream.pause();
} else _done = true;
};
var _onData = memoize;
this.read = function(cb) {
if (_chunks.length > 0) {
var chunk = _chunks.splice(0, 1)[0];
if (_chunks.length === 0) {
stream.resume();
}
return cb(null, chunk);
} else if (_done) {
return cb(null, null);
} else if (_error) {
return cb(_error);
} else _onData = function(err, chunk) {
if (!err && !chunk) _done = true;
_onData = memoize;
cb(err, chunk);
};
}
}
exports.CallbackWriteWrapper = function(stream) {
var _error;
var _onDrain;
stream.on('error', function(err) {
if (_onDrain) _onDrain(err);
else _error = err;
});
stream.on('drain', function() {
_onDrain && _onDrain();
});
this.write = function(data, cb) {
if (_error) return cb(_error);
if (data != null) {
if (!stream.write(data)) {
_onDrain = function(err) {
_onDrain = null;
cb(err);
};
} else {
process.nextTick(cb);
}
} else {
stream.end();
}
}
}
exports.CallbackWrapper = function(stream) {
exports.CallbackReadWrapper.call(this, stream);
exports.CallbackWriteWrapper.call(this, stream);
}
exports.EventReadWrapper = function(stream) {
var self = this;
var q = [],
paused;
function doRead(err, data) {
if (err) self.emit('error', err);
else if (data != null) {
if (paused) {
q.push(data);
} else {
self.emit('data', data);
stream.read(doRead);
}
} else {
if (paused) {
q.push(null);
} else {
self.emit('end');
}
}
}
self.pause = function() {
paused = true;
}
self.resume = function() {
var data;
while ((data = q.shift()) !== undefined) {
if (data != null) self.emit('data', data);
else self.emit('end');
}
paused = false;
stream.read(doRead);
}
stream.read(doRead);
}
exports.EventReadWrapper.prototype = new EventEmitter();
exports.EventWriteWrapper = function(stream) {
var self = this;
var chunks = [];
function written(err) {
if (err) self.emit('error', err);
else {
chunks.splice(0, 1);
if (chunks.length === 0) self.emit('drain');
else stream.write(chunks[0], written);
}
}
this.write = function(data) {
chunks.push(data);
if (chunks.length === 1) stream.write(data, written);
return chunks.length === 0;
}
this.end = function(data) {
if (data != null) self.write(data);
self.write(null);
}
}
exports.EventWriteWrapper.prototype = new EventEmitter();
exports.EventWrapper = function(stream) {
exports.EventReadWrapper.call(this, stream);
exports.EventWriteWrapper.call(this, stream);
}
exports.EventWrapper.prototype = new EventEmitter();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment