Skip to content

Instantly share code, notes, and snippets.

@vsetka
Created October 19, 2017 11:58
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vsetka/6504d03bfedc91d4e4903f5229ab358c to your computer and use it in GitHub Desktop.
Save vsetka/6504d03bfedc91d4e4903f5229ab358c to your computer and use it in GitHub Desktop.
Demonstrates how a file can be read as a stream (from network), piped through gzip and into an s3 bucket without having to store it locally our load it whole in memory
const { Writable, Readable, PassThrough } = require("stream");
const { inherits } = require("util");
const AWS = require("aws-sdk");
const zlib = require("zlib");
const https = require("https");
const gzip = zlib.createGzip();
const s3 = new AWS.S3();
// Change the bucket name to point to an S3 bucket write-able within the IAM role assigned to your Lambda
const bucketName = "bas-zip-test";
const outputFileName = "compressed.zip";
let createMultipartUpload = promisify("createMultipartUpload", s3);
let uploadPart = promisify("uploadPart", s3);
let completeMultipartUpload = promisify("completeMultipartUpload", s3);
function promisify(func, context) {
return options => {
return new Promise((resolve, reject) => {
context[func](options, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
};
}
function getPartUploadStream(UploadId, inputStream, streamOptions) {
// Maximum upload part size is 5mb
const MAX_PART_SIZE = 1024 * 1024 * 5;
let partBuffer = new Buffer(0);
let partCount = 0;
let multipartUpload = {
Parts: []
};
// We're creating our own implementation of the writable stream
function PartPassthroughBuffer(streamOptions) {
Writable.call(this, streamOptions);
}
// Inherit from the writeable stream
inherits(PartPassthroughBuffer, Writable);
PartPassthroughBuffer.prototype._write = function(chunk, encoding, cb) {
// If the new chunk data overflows the buffer, upload it
if (chunk.length + partBuffer.length > MAX_PART_SIZE) {
inputStream && inputStream.pause();
uploadChunk(chunk)
.then(data => inputStream && inputStream.resume())
.catch(onError);
} else {
// Otherwise, add the new chunk to the buffer
partBuffer = Buffer.concat([partBuffer, chunk]);
console.log(`Buffer size: ${partBuffer.length}`);
}
cb();
};
function uploadDataPart(Body, PartNumber) {
console.log(`Uploading part ${PartNumber}`);
// Uploads and associate the data to a previously created multipart upload using the uploadId
return uploadPart({
Bucket: bucketName,
Key: outputFileName,
Body,
PartNumber,
UploadId
}).then(data => {
console.log(`Uploaded part ${PartNumber}, with size ${Body.length}`);
// Add the chunk/part to the parts map which will be used to finalize the upload
multipartUpload.Parts.push({
ETag: data.ETag,
PartNumber: Number(PartNumber)
});
return Promise.resolve(multipartUpload);
});
}
function uploadChunk(chunk) {
// Fill the remaining free space in the buffer with new data
let fillSize = MAX_PART_SIZE - partBuffer.length;
partBuffer = Buffer.concat([partBuffer, chunk.slice(0, fillSize)]);
// Copy the buffer
let body = new Buffer(partBuffer.length);
partBuffer.copy(body);
partBuffer = chunk.slice(fillSize, chunk.length - fillSize);
partCount++;
// Upload the buffered data part
return uploadDataPart(body, partCount);
}
function finalizeUpload() {
console.log("All data has been read");
// Pause input stream until upload is done (if the input stream has been passed in)
// Better setup would be to queue the chunks for upload and process them from the queue
// so that internal stream mechanisms for handling backpressue can be used
// https://nodejs.org/en/docs/guides/backpressuring-in-streams/
inputStream && inputStream.pause();
// Push an empty chunk which effectively flushes the buffer (uploads the remainder)
uploadChunk(new Buffer(0))
.then(data => inputStream && inputStream.resume())
.then(() =>
// This will assemble the previously uploaded parts using information in MultipartUpload
completeMultipartUpload({
Bucket: bucketName,
Key: outputFileName,
MultipartUpload: multipartUpload,
UploadId
})
)
.catch(onError);
}
function onError(err) {
bufferingStream.emit("error", err);
}
let bufferingStream = new PartPassthroughBuffer(streamOptions);
// When input stream is finished, we need to flush the buffer and finalize the upload
bufferingStream.on("finish", finalizeUpload);
return bufferingStream;
}
// Streams an annual report pdf from iso.org for testing purposes
function getInputStream() {
return new Promise((resolve, reject) => {
const req = https.request(
{
hostname: "www.iso.org",
port: 443,
protocol: "https:",
path:
"/files/live/sites/isoorg/files/archive/pdf/en/annual_report_2009.pdf",
method: "GET",
headers: {
"Cache-Control": "no-cache"
}
},
res => resolve(res)
);
req.on("error", reject);
req.end();
});
}
exports.handler = (event, context, callback) => {
// Get a test input stream and initiate multipart upload
Promise.all([
getInputStream(),
createMultipartUpload({
Bucket: bucketName,
Key: outputFileName,
ContentType: "application/tar+gzip"
})
])
.then(([inputStream, multipart]) => {
{
inputStream
.pipe(gzip)
.pipe(getPartUploadStream(multipart.UploadId, inputStream))
.on("error", error => {
console.error(error);
callback(error);
})
.on("finish", () => {
console.log("Done!");
callback(null, true);
});
}
})
.catch(err => {
console.error(err);
callback(error);
});
};
@madmod
Copy link

madmod commented Oct 6, 2020

Wow this is awesome thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment