Last active
August 29, 2015 14:03
-
-
Save lsegal/e6966a03bcbf077724d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//////////////////////////////////////////////////////////////////////////////// | |
// READABLE STREAM IMPLEMENTATION // | |
//////////////////////////////////////////////////////////////////////////////// | |
var https = require('https'); | |
var streams = require('stream'); | |
function createRequest(bucket, key, callback) { | |
return { | |
send: function() { | |
var options = { | |
host: bucket + '.s3.amazonaws.com', port: 443, method: 'GET', | |
headers: {}, path: '/' + key | |
}; | |
https.request(options, callback).end(); | |
} | |
}; | |
} | |
function createS3Stream(bucket, key) { | |
var req, sent = false, stream = new streams.Readable(); | |
stream._read = function() { stream.push(''); }; | |
stream.on('newListener', function(event) { | |
if (!sent && event === 'readable') { | |
sent = true; | |
process.nextTick(req.send); | |
} | |
}); | |
req = createRequest(bucket, key, function(resp) { | |
stream.emit('start', resp); // emit a custom event to track statusCode | |
if (resp.statusCode < 300) { | |
// Core _read implementation, see: | |
// http://nodejs.org/api/stream.html#stream_readable_read_size_1 | |
stream._read = function() { | |
// Taken from: | |
// http://nodejs.org/api/stream.html#stream_readable_read_size | |
var chunk; | |
while (null !== (chunk = resp.read())) { | |
console.log('---> %d bytes', chunk.length); | |
stream.push(chunk); | |
} | |
// Taken from: | |
// http://nodejs.org/api/stream.html#stream_example_simpleprotocol_v1_sub_optimal | |
// if the source doesn't have data, we don't have data yet. | |
if (chunk === null) stream.push(''); | |
}; | |
// proxy all events to stream | |
Object.keys(stream._events).forEach(function(event) { | |
resp.on(event, function(arg) { | |
console.log(event); | |
stream.emit(event, arg); | |
}); | |
}); | |
// emit EOF on end | |
resp.on('end', function() { | |
stream.push(null); | |
}); | |
} | |
}); | |
return stream; | |
} | |
//////////////////////////////////////////////////////////////////////////////// | |
// USAGE // | |
//////////////////////////////////////////////////////////////////////////////// | |
var fs = require('fs'); | |
var s3Path = 'test_out.json'; | |
var localFile = 'test_out.json'; | |
var count = 1; | |
function downloadOnce() { | |
doTheDownload(function(err) { | |
if (err) throw err; | |
console.log('downloaded', count++); | |
downloadOnce(); | |
}); | |
} | |
function doTheDownload(cb) { | |
function handleError(err) { | |
if (errorOccurred) return; | |
errorOccurred = true; | |
cb(err); | |
} | |
var stream = createS3Stream(process.env.S3_BUCKET, s3Path); | |
var outStream = fs.createWriteStream(localFile); | |
var errorOccurred = false; | |
var contentLength; | |
stream.on('error', handleError); | |
stream.on('start', function(resp) { | |
if (resp.statusCode < 300) { | |
contentLength = parseInt(resp.headers['content-length'], 10); | |
} else { | |
handleError(new Error('http status code ' + resp.statusCode)); | |
} | |
}); | |
outStream.on('error', handleError); | |
outStream.on('close', function() { | |
if (errorOccurred) return; | |
fs.stat(localFile, function(err, stat) { | |
if (err) return handleError(err); | |
if (stat.size !== contentLength) { | |
return handleError(new Error('size mismatch')); | |
} | |
cb(); | |
}); | |
}); | |
stream.pipe(outStream); | |
} | |
downloadOnce(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment