Created
July 4, 2012 20:53
-
-
Save bjouhier/3049484 to your computer and use it in GitHub Desktop.
equivalence between event-oriented and callback-oriented stream API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
var wrappers = require('./wrappers'); | |
var http = require('http'); | |
var zlib = require('zlib'); | |
var util = require('util'); | |
var fs = require('fs'); | |
http.createServer(function(request, response) { | |
response.writeHead(200, { | |
'Content-Type': 'text/plain; charset=utf8', | |
'Content-Encoding': 'deflate', | |
}); | |
var source = fs.createReadStream(__dirname + '/wrappers.txt'); | |
var deflate = zlib.createDeflate(); | |
util.pump(source, deflate); | |
util.pump(deflate, response); | |
}).listen(1337); | |
console.log('Server running at http://127.0.0.1:1337/'); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
var wrappers = require('./wrappers'); | |
var http = require('http'); | |
var zlib = require('zlib'); | |
var util = require('util'); | |
var fs = require('fs'); | |
http.createServer(function(request, response) { | |
response.writeHead(200, { | |
'Content-Type': 'text/plain; charset=utf8', | |
'Content-Encoding': 'deflate', | |
}); | |
var source = fs.createReadStream(__dirname + '/wrappers.txt'); | |
var deflate = zlib.createDeflate(); | |
source = new wrappers.EventReadWrapper(new wrappers.CallbackReadWrapper(source)); | |
response = new wrappers.EventWriteWrapper(new wrappers.CallbackWriteWrapper(response)); | |
deflate = new wrappers.EventWrapper(new wrappers.CallbackWrapper(deflate)); | |
util.pump(source, deflate); | |
util.pump(deflate, response); | |
}).listen(1337); | |
console.log('Server running at http://127.0.0.1:1337/'); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
var wrappers = require('./wrappers'); | |
var http = require('http'); | |
var zlib = require('zlib'); | |
var util = require('util'); | |
var fs = require('fs'); | |
http.createServer(function(request, response) { | |
response.writeHead(200, { | |
'Content-Type': 'text/plain; charset=utf8', | |
'Content-Encoding': 'deflate', | |
}); | |
var source = fs.createReadStream(__dirname + '/wrappers.js'); | |
var deflate = zlib.createDeflate(); | |
source = new wrappers.CallbackReadWrapper(source); | |
response = new wrappers.CallbackWriteWrapper(response); | |
deflate = new wrappers.CallbackWrapper(deflate); | |
pump(source, deflate); | |
pump(deflate, response); | |
}).listen(1337); | |
console.log('Server running at http://127.0.0.1:1337/'); | |
function pump(inStream, outStream, _) { | |
var data; | |
do { | |
data = inStream.read(_); | |
outStream.write(data, _); | |
} while (data); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var EventEmitter = require('events').EventEmitter; | |
exports.CallbackReadWrapper = function(stream) { | |
var _chunks = []; | |
var _error; | |
var _done = false; | |
stream.on('error', function(err) { | |
_onData(err); | |
}); | |
stream.on('data', function(data) { | |
_onData(null, data); | |
}); | |
stream.on('end', function() { | |
_onData(null, null); | |
}); | |
function memoize(err, chunk) { | |
if (err) _error = err; | |
else if (chunk) { | |
_chunks.push(chunk); | |
stream.pause(); | |
} else _done = true; | |
}; | |
var _onData = memoize; | |
this.read = function(cb) { | |
if (_chunks.length > 0) { | |
var chunk = _chunks.splice(0, 1)[0]; | |
if (_chunks.length === 0) { | |
stream.resume(); | |
} | |
return cb(null, chunk); | |
} else if (_done) { | |
return cb(null, null); | |
} else if (_error) { | |
return cb(_error); | |
} else _onData = function(err, chunk) { | |
if (!err && !chunk) _done = true; | |
_onData = memoize; | |
cb(err, chunk); | |
}; | |
} | |
} | |
exports.CallbackWriteWrapper = function(stream) { | |
var _error; | |
var _onDrain; | |
stream.on('error', function(err) { | |
if (_onDrain) _onDrain(err); | |
else _error = err; | |
}); | |
stream.on('drain', function() { | |
_onDrain && _onDrain(); | |
}); | |
this.write = function(data, cb) { | |
if (_error) return cb(_error); | |
if (data != null) { | |
if (!stream.write(data)) { | |
_onDrain = function(err) { | |
_onDrain = null; | |
cb(err); | |
}; | |
} else { | |
process.nextTick(cb); | |
} | |
} else { | |
stream.end(); | |
} | |
} | |
} | |
exports.CallbackWrapper = function(stream) { | |
exports.CallbackReadWrapper.call(this, stream); | |
exports.CallbackWriteWrapper.call(this, stream); | |
} | |
exports.EventReadWrapper = function(stream) { | |
var self = this; | |
var q = [], | |
paused; | |
function doRead(err, data) { | |
if (err) self.emit('error', err); | |
else if (data != null) { | |
if (paused) { | |
q.push(data); | |
} else { | |
self.emit('data', data); | |
stream.read(doRead); | |
} | |
} else { | |
if (paused) { | |
q.push(null); | |
} else { | |
self.emit('end'); | |
} | |
} | |
} | |
self.pause = function() { | |
paused = true; | |
} | |
self.resume = function() { | |
var data; | |
while ((data = q.shift()) !== undefined) { | |
if (data != null) self.emit('data', data); | |
else self.emit('end'); | |
} | |
paused = false; | |
stream.read(doRead); | |
} | |
stream.read(doRead); | |
} | |
exports.EventReadWrapper.prototype = new EventEmitter(); | |
exports.EventWriteWrapper = function(stream) { | |
var self = this; | |
var chunks = []; | |
function written(err) { | |
if (err) self.emit('error', err); | |
else { | |
chunks.splice(0, 1); | |
if (chunks.length === 0) self.emit('drain'); | |
else stream.write(chunks[0], written); | |
} | |
} | |
this.write = function(data) { | |
chunks.push(data); | |
if (chunks.length === 1) stream.write(data, written); | |
return chunks.length === 0; | |
} | |
this.end = function(data) { | |
if (data != null) self.write(data); | |
self.write(null); | |
} | |
} | |
exports.EventWriteWrapper.prototype = new EventEmitter(); | |
exports.EventWrapper = function(stream) { | |
exports.EventReadWrapper.call(this, stream); | |
exports.EventWriteWrapper.call(this, stream); | |
} | |
exports.EventWrapper.prototype = new EventEmitter(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment