Skip to content

Instantly share code, notes, and snippets.

@lsegal
Last active August 29, 2015 14:03
Show Gist options
  • Save lsegal/e6966a03bcbf077724d1 to your computer and use it in GitHub Desktop.
Save lsegal/e6966a03bcbf077724d1 to your computer and use it in GitHub Desktop.
////////////////////////////////////////////////////////////////////////////////
// READABLE STREAM IMPLEMENTATION //
////////////////////////////////////////////////////////////////////////////////
var https = require('https');
var streams = require('stream');
function createRequest(bucket, key, callback) {
return {
send: function() {
var options = {
host: bucket + '.s3.amazonaws.com', port: 443, method: 'GET',
headers: {}, path: '/' + key
};
https.request(options, callback).end();
}
};
}
function createS3Stream(bucket, key) {
var req, sent = false, stream = new streams.Readable();
stream._read = function() { stream.push(''); };
stream.on('newListener', function(event) {
if (!sent && event === 'readable') {
sent = true;
process.nextTick(req.send);
}
});
req = createRequest(bucket, key, function(resp) {
stream.emit('start', resp); // emit a custom event to track statusCode
if (resp.statusCode < 300) {
// Core _read implementation, see:
// http://nodejs.org/api/stream.html#stream_readable_read_size_1
stream._read = function() {
// Taken from:
// http://nodejs.org/api/stream.html#stream_readable_read_size
var chunk;
while (null !== (chunk = resp.read())) {
console.log('---> %d bytes', chunk.length);
stream.push(chunk);
}
// Taken from:
// http://nodejs.org/api/stream.html#stream_example_simpleprotocol_v1_sub_optimal
// if the source doesn't have data, we don't have data yet.
if (chunk === null) stream.push('');
};
// proxy all events to stream
Object.keys(stream._events).forEach(function(event) {
resp.on(event, function(arg) {
console.log(event);
stream.emit(event, arg);
});
});
// emit EOF on end
resp.on('end', function() {
stream.push(null);
});
}
});
return stream;
}
////////////////////////////////////////////////////////////////////////////////
// USAGE //
////////////////////////////////////////////////////////////////////////////////
var fs = require('fs');
var s3Path = 'test_out.json';
var localFile = 'test_out.json';
var count = 1;
function downloadOnce() {
doTheDownload(function(err) {
if (err) throw err;
console.log('downloaded', count++);
downloadOnce();
});
}
function doTheDownload(cb) {
function handleError(err) {
if (errorOccurred) return;
errorOccurred = true;
cb(err);
}
var stream = createS3Stream(process.env.S3_BUCKET, s3Path);
var outStream = fs.createWriteStream(localFile);
var errorOccurred = false;
var contentLength;
stream.on('error', handleError);
stream.on('start', function(resp) {
if (resp.statusCode < 300) {
contentLength = parseInt(resp.headers['content-length'], 10);
} else {
handleError(new Error('http status code ' + resp.statusCode));
}
});
outStream.on('error', handleError);
outStream.on('close', function() {
if (errorOccurred) return;
fs.stat(localFile, function(err, stat) {
if (err) return handleError(err);
if (stat.size !== contentLength) {
return handleError(new Error('size mismatch'));
}
cb();
});
});
stream.pipe(outStream);
}
downloadOnce();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment