Skip to content

Instantly share code, notes, and snippets.

@amiantos
Last active October 30, 2024 20:12
Show Gist options
  • Save amiantos/16bacc9ed742c91151fcf1a41012445e to your computer and use it in GitHub Desktop.
Save amiantos/16bacc9ed742c91151fcf1a41012445e to your computer and use it in GitHub Desktop.
Zip Multiple Files from S3 using AWS Lambda Function
// Lambda S3 Zipper
// http://amiantos.net/zip-multiple-files-on-aws-s3/
//
// Accepts a bundle of data in the format...
// {
// "bucket": "your-bucket",
// "destination_key": "zips/test.zip",
// "files": [
// {
// "uri": "...", (options: S3 file key or URL)
// "filename": "...", (filename of file inside zip)
// "type": "..." (options: [file, url])
// }
// ]
// }
// Saves zip file at "destination_key" location
"use strict";
const AWS = require("aws-sdk");
const awsOptions = {
region: "us-east-1",
httpOptions: {
timeout: 300000 // Matching Lambda function timeout
}
};
const s3 = new AWS.S3(awsOptions);
const archiver = require("archiver");
const stream = require("stream");
const request = require("request");
const streamTo = (bucket, key, resolve) => {
var passthrough = new stream.PassThrough();
s3.upload(
{
Bucket: bucket,
Key: key,
Body: passthrough,
ContentType: "application/zip",
ServerSideEncryption: "AES256"
},
(err, data) => {
if (err) throw err;
console.log("Zip uploaded");
resolve();
}
).on("httpUploadProgress", (progress) => {
console.log(progress);
});
return passthrough;
};
// Kudos to this person on GitHub for this getStream solution
// https://github.com/aws/aws-sdk-js/issues/2087#issuecomment-474722151
const getStream = (bucket, key) => {
let streamCreated = false;
const passThroughStream = new stream.PassThrough();
passThroughStream.on("newListener", event => {
if (!streamCreated && event == "data") {
const s3Stream = s3
.getObject({ Bucket: bucket, Key: key })
.createReadStream();
s3Stream
.on("error", err => passThroughStream.emit("error", err))
.pipe(passThroughStream);
streamCreated = true;
}
});
return passThroughStream;
};
exports.handler = async (event, context, callback) => {
var bucket = event["bucket"];
var destinationKey = event["destination_key"];
var files = event["files"];
await new Promise(async (resolve, reject) => {
var zipStream = streamTo(bucket, destinationKey, resolve);
zipStream.on("error", reject);
var archive = archiver("zip");
archive.on("error", err => {
throw new Error(err);
});
archive.pipe(zipStream);
for (const file of files) {
if (file["type"] == "file") {
archive.append(getStream(bucket, file["uri"]), {
name: file["filename"]
});
} else if (file["type"] == "url") {
archive.append(request(file["uri"]), { name: file["filename"] });
}
}
archive.finalize();
}).catch(err => {
throw new Error(err);
});
callback(null, {
statusCode: 200,
body: { final_destination: destinationKey }
});
};
@WLS-JD
Copy link

WLS-JD commented Oct 14, 2020

Hey I read your blog post and you mentioned that you couldn't get it to work in python. I'm curious what you tried as I've also been trying to write something like this in python and haven't been able to keep memory usage down using zipfile with a stream - guessing that was maybe the same problem you were having?

Thank you for posting this! Super helpful for keeping things serverless

@amiantos
Copy link
Author

@WLS-JD I tried basically everything that comes up on google, so python-zipstream or zipstream-new, stuff in these SA answers... a couple other things. I only spent about a day on it, long enough to do some tests to see that memory usage kept growing as the zip did. The JS solution is the only thing that actually gave me low memory usage.

@WLS-JD
Copy link

WLS-JD commented Oct 15, 2020

@amiantos thanks for the reply. When I have some free time I'm going to try out some more ideas with python. this node script helped me big time so huge thanks. I'll post back if I get anywhere with python

@MikeCraig418
Copy link

Hi @amiantos, thank you for the blog write up and this source code! I'm running into an error that prevents me from uploading to Lambda. The error is Cannot find handler 'app.handler' in project.

It relates to this line:

exports.handler = async (event, context, callback) => {
// ...
// ...

The documentation at https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html says
image

When I remove the callback, the lambda uploads but obviously doesn't run correctly.

When I add the callback in, my IDE wants me to remove the async functionality.

Any tips? Do you have a complete working example you can share? Is there a serverless template .yaml file that can be referenced too?

Thanks again! Your write up was definitely the most complete and informative.

@amiantos
Copy link
Author

@949mac I'm pretty sure the file in the gist is 'fully working', compared to the file I'm using in production there's some specialty code in my real implementation that relates to the service it's for, but otherwise you should be able to create a lambda function with the node runtime and just put this file in as index.js and it'll work. No need for a serverless yml template. I admit I'm no expert in node/js land, my wheelhouse is Python and Swift.

For the sake of trying to be helpful, this is the literal code that I am running in production. You'll notice the Sketch-related code has been removed from the gist. In lambda, I'm using the Node.js 12.x runtime and have the handler set to index.handler. Nothing special :)

// Lambda Zipper
// Accepts a bundle of data in the format...
// {
//     "bucket": "bucket-name",
//     "destination_key": "zips/test.zip",
//     "files": [
//         {
//             "uri": "...", (options: S3 file key or URL)
//             "filename": "...", (filename of file inside zip)
//             "type": "..." (options: [file, sketch, url])
//         }
//     ]
// }
// Saves zip file at "destination_key" location

"use strict";

const AWS = require("aws-sdk");
const awsOptions = {
  region: "us-east-1",
  httpOptions: {
    timeout: 300000
  }
};
const s3 = new AWS.S3(awsOptions);
const archiver = require("archiver");
const stream = require("stream");
const request = require("request");
const yauzl = require("yauzl");

const streamTo = (bucket, key) => {
  var passthrough = new stream.PassThrough();
  s3.upload(
    {
      Bucket: bucket,
      Key: key,
      Body: passthrough,
      ContentType: "application/zip",
      ServerSideEncryption: "AES256"
    },
    (err, data) => {
      if (err) throw err;
    }
  );
  return passthrough;
};

const getStream = (bucket, key) => {
  let streamCreated = false;
  const passThroughStream = new stream.PassThrough();

  passThroughStream.on("newListener", event => {
    if (!streamCreated && event == "data") {
      const s3Stream = s3
        .getObject({ Bucket: bucket, Key: key })
        .createReadStream();
      s3Stream
        .on("error", err => passThroughStream.emit("error", err))
        .pipe(passThroughStream);

      streamCreated = true;
    }
  });

  return passThroughStream;
};

const getSketchPreview = async (bucket, key) => {
  return new Promise((resolve, reject) => {
    s3.getObject({ Bucket: bucket, Key: key }, function(err, data) {
      if (err) throw err;
      yauzl.fromBuffer(data.Body, { lazyEntries: true }, function(
        err,
        zipfile
      ) {
        if (err) throw err;
        zipfile.readEntry();
        zipfile.on("entry", function(entry) {
          if (entry.fileName == "preview.png") {
            zipfile.openReadStream(entry, function(err, readStream) {
              if (err) throw err;
              resolve(readStream);
            });
          } else {
            zipfile.readEntry();
          }
        });
      });
    });
  });
};

exports.handler = async (event, context, callback) => {
  var bucket = event["bucket"];
  var destinationKey = event["destination_key"];
  var files = event["files"];

  await new Promise(async (resolve, reject) => {
    var zipStream = streamTo(bucket, destinationKey);
    zipStream.on("close", resolve);
    zipStream.on("end", resolve);
    zipStream.on("error", reject);

    var archive = archiver("zip");
    archive.on("error", err => {
      throw new Error(err);
    });
    archive.pipe(zipStream);

    for (const file of files) {
      if (file["type"] == "file") {
        archive.append(getStream(bucket, file["uri"]), {
          name: file["filename"]
        });
      } else if (file["type"] == "url") {
        archive.append(request(file["uri"]), { name: file["filename"] });
      } else if (file["type"] == "sketch") {
        const sketchFile = await getSketchPreview(bucket, file["uri"]);
        archive.append(sketchFile, { name: file["filename"] });
      }
    }
    archive.finalize();
  }).catch(err => {
    throw new Error(err);
  });

  callback(null, {
    statusCode: 200,
    body: { final_destination: destinationKey }
  });
};

You also might want to look into automating your lambda deployments, it could help. I have an blog post here that was written before I wrote the zipper, so it lacks the node stuff. This is a new version of package.sh that features node integration:

#!/bin/bash

mkdir tmp411
[ -f /io/requirements.txt ] && python3 -m pip install -r /io/requirements.txt -t tmp411
rm -f /io/lambda.zip
cp -r /io/* tmp411
cd tmp411
test -f /tmp411/package.json && npm ci
zip -r /io/lambda.zip *

cd /io
echo "Uploading zip to AWS..."
maximumsize=15000000
zipsize=$(wc -c <"lambda.zip")
if (( zipsize < maximumsize )); then
    aws lambda update-function-code --function-name $LAMBDA_FUNC --zip-file fileb://lambda.zip
    rm -f /io/lambda.zip 
else
    echo "Filesize too large, not uploading automatically..."
fi

@MikeCraig418
Copy link

@amiantos - Thank you! I really appreciate the thorough response.

I ran into this article:

https://www.serverless.com/blog/common-node8-mistakes-in-lambda

It suggests removing the callback and replacing it with a simple return.

image

My IDE integration now works :). Admittedly, I do need to learn the ins-and-outs of deployment a bit more.

@amiantos
Copy link
Author

Glad it worked out! 👍 Hopefully your comments will help someone else in the future.

@daniele-pelagatti
Copy link

daniele-pelagatti commented Dec 20, 2020

@amiantos thanks for this, I'm using it to create backups and it works really well, except one detail:

The whole "upload" of the ZIP is not completed when the stream fires "close" or "end", it is only done when the s3.upload callback is invoked with no errors.

I ran into a weird problem where everything "worked" as expected, the logs indicated nothing wrong and no error was logged, except the zip was not present in the destination S3 when the lambda was done.

A few hours of digging revealed that the lambda exited right after the zip was created but before the upload was finished, and this caused the problem above because of a peculiarity in how Lambda manages background tasks and async functions, read more here, TLDR: if you invoke the callback before an async task has finished executing, that task is "frozen" and (maybe) it will resume on subsequent invocations.

I'm not sure why your code works for you but the best explanation I can think of is: you await something else after the zip creation and before the lambda callback, that gives the upload time to finish (it usually takes a few seconds after your original resolve).

So, long story short, I fixed my problem with the code below:

zip.js

'use strict'
const AWS = require('aws-sdk')
const s3 = new AWS.S3()
const archiver = require('archiver')
const stream = require('stream')

/**
 * Kudos to this person on GitHub for this getS3FileStream solution
 * https://github.com/aws/aws-sdk-js/issues/2087#issuecomment-474722151
 *
 * @param {*} bucket
 * @param {*} key
 */
exports.getS3FileStream = (bucket, key) => {
  let streamCreated = false
  const passThroughStream = new stream.PassThrough()

  passThroughStream.on('newListener', event => {
    if (!streamCreated && event === 'data') {
      const s3Stream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream()

      s3Stream.on('error', err => passThroughStream.emit('error', err)).pipe(passThroughStream)

      streamCreated = true
    }
  })

  return passThroughStream
}

exports.createZipStream = ({ bucket, destinationKey, resolve, reject }) => {
  const zipStream = new stream.PassThrough()
  s3.upload({
    Bucket: bucket,
    Key: destinationKey,
    Body: zipStream,
    ContentType: 'application/zip'
    // ServerSideEncryption: 'AES256'
  }, (err, data) => {
    if (err) {
      console.error('Error while uploading zip')
      reject(err)
      return
    }
    console.log('Zip uploaded')
    resolve()
  })

  zipStream.on('error', reject)

  const archive = archiver('zip', { level: 0 })
  archive
    .on('error', (err) => {
      console.log('archive error', err)
      reject(err)
    })
    .on('entry', entry => {
      console.log('archive event: entry ', entry.name)
    })
    .on('progress', data => {
      console.log('archive event: progress', data)
    })
    // .on('data', data => {
    //   console.log(`archive event: data | processed total ${(archive.pointer() / (1024 ** 2)).toFixed(3)} MB | data length: ${data.length}`)
    // })
  archive.pipe(zipStream)
  return archive
}

index.js (handler, redacted, forgive me if this doesn't compile)

const { createZipStream, getS3FileStream } = require('./zip')
const DST_BUCKET_NAME = 'some-bucket-id'
/**
 *
 * @param {*} param0
 */
const zipData = async ({ destinationKey, bucket } = {}) => {
  // prepare list of files & data to be zipped
  // run in parallel
  const [files, data] = await Promise.all([
    fetchListOfFiles(),
    fetchData2()
  ])

  return new Promise((resolve, reject) => {
    const archive = createZipStream({
      bucket,
      destinationKey,
      reject,
      resolve: () => resolve({
        data: data.map(d => d.id),
        files: files.map(f => f.name)
      })
    })

    data.forEach(d => {
      archive.append(JSON.stringify(d, null, 2), { name: `data/${d.id}.json` })
    })

    files.forEach(file => {
      const s3File = getS3FileStream(bucket, file.name)
      archive.append(s3File, { name: file.name })
    })
    archive.finalize()
  })
}

/**
 *
 * @param {*} event
 * @param {*} context
 * @param {*} callback
 */
exports.handler = async (event, context, callback) => {
  const destinationKey = `backups/${new Date().toISOString()}.zip`

  // await prepareSomething()

  const zipContentDescriptor = await zipData({
    destinationKey,
    bucket: DST_BUCKET_NAME
  })
  
  // await emailUserWithResultsLink()
  // etc
 
  return {
    statusCode: 200,
    body: {
      destination: `s3://${DST_BUCKET_NAME}/${destinationKey}`
      content: JSON.stringify(zipContentDescriptor, null, 2)
    }
  }
}

@amiantos
Copy link
Author

@daniele-pelagatti huh! interesting 🤔 if the error was common enough for you, I'm surprised it hasn't shown up on my side. We've served up quite a few zips generated this way and haven't heard any complaints... I do feel like the description of the error that happens (file doesn't complete until the next task begins, basically) is familiar to me and I did run into it while building this, but it's been so long now I don't remember how I fixed it at the time.

@daniele-pelagatti
Copy link

@daniele-pelagatti huh! interesting 🤔 if the error was common enough for you, I'm surprised it hasn't shown up on my side. We've served up quite a few zips generated this way and haven't heard any complaints... I do feel like the description of the error that happens (file doesn't complete until the next task begins, basically) is familiar to me and I did run into it while building this, but it's been so long now I don't remember how I fixed it at the time.

It was pretty consistent, yes. But that could have been because of many contributing factors:

  • I create a zip with compression level 0 (i want fast compression in order to process as many files as possible before running in the 15 minutes timeout for an async lambda invokation). This maybe allowed archiver to finish the archive before the upload was complete.
  • I use an async return value while you invoke the lambda callback: maybe this contributes somehow?
  • the problem was consistent only above a certain threshold of files compressed, not sure why.
  • I use node 12 runtime in my lambda, maybe an older runtime didn't trigger the problem?
  • the zip creation/upload was the last thing I did in my handler: like I mentioned, if you give it a few seconds after the archive process is done, the upload will complete and the zip will be created in the s3. This wasn't my case so the lambda would not finish the s3.upload in time.

Neverthless: the whole purpose of this lambda is to upload something to an s3 (coincidentally a streaming zip), so I find it fitting to declare the process as "done" only when s3.upload is done

@jonafd1
Copy link

jonafd1 commented Jan 13, 2021

@amiantos - Thank you! I really appreciate the thorough response.

I ran into this article:

https://www.serverless.com/blog/common-node8-mistakes-in-lambda

It suggests removing the callback and replacing it with a simple return.

image

My IDE integration now works :). Admittedly, I do need to learn the ins-and-outs of deployment a bit more.

Hello @949mac. May I ask if you made any other code changes besides changing the callback to a return? Because I tried it on me end, without changing anything besides that, but it did not work.

@hamsteven
Copy link

@amiantos thanks for the reply. When I have some free time I'm going to try out some more ideas with python. this node script helped me big time so huge thanks. I'll post back if I get anywhere with python

Did you ever get anywhere doing this with Python? I'm currently attempting it.

@amiantos
Copy link
Author

Did you ever get anywhere doing this with Python? I'm currently attempting it.

Pinging @WLS-JD just in case it helps them see your question.

@RyanClementsHax
Copy link

RyanClementsHax commented Jun 17, 2021

@daniele-pelagatti huh! interesting 🤔 if the error was common enough for you, I'm surprised it hasn't shown up on my side. We've served up quite a few zips generated this way and haven't heard any complaints... I do feel like the description of the error that happens (file doesn't complete until the next task begins, basically) is familiar to me and I did run into it while building this, but it's been so long now I don't remember how I fixed it at the time.

It was pretty consistent, yes. But that could have been because of many contributing factors:

  • I create a zip with compression level 0 (i want fast compression in order to process as many files as possible before running in the 15 minutes timeout for an async lambda invokation). This maybe allowed archiver to finish the archive before the upload was complete.
  • I use an async return value while you invoke the lambda callback: maybe this contributes somehow?
  • the problem was consistent only above a certain threshold of files compressed, not sure why.
  • I use node 12 runtime in my lambda, maybe an older runtime didn't trigger the problem?
  • the zip creation/upload was the last thing I did in my handler: like I mentioned, if you give it a few seconds after the archive process is done, the upload will complete and the zip will be created in the s3. This wasn't my case so the lambda would not finish the s3.upload in time.

Neverthless: the whole purpose of this lambda is to upload something to an s3 (coincidentally a streaming zip), so I find it fitting to declare the process as "done" only when s3.upload is done

Hello! Excellent work @amiantos for creating this and thanks to all that have contributed. I can't describe how many headaches this solved for me and my team. I figured I would contribute what I have found regarding the above issue.

I have also recreated this issue locally (haven't tried deploying the lambda yet). The lag between when the handler resolves and when the final part uploads seems scale with the size of the final zip. Making sure that the callback passed into s3.upload is the thing that resolves the promise rather than the zipStream close and end events fixed the issue for me and got everything synced. Below is the original gist with the fix.

// Lambda S3 Zipper
// http://amiantos.net/zip-multiple-files-on-aws-s3/
//
// Accepts a bundle of data in the format...
// {
//     "bucket": "your-bucket",
//     "destination_key": "zips/test.zip",
//     "files": [
//         {
//             "uri": "...", (options: S3 file key or URL)
//             "filename": "...", (filename of file inside zip)
//             "type": "..." (options: [file, url])
//         }
//     ]
// }
// Saves zip file at "destination_key" location

"use strict";

const AWS = require("aws-sdk");
const awsOptions = {
  region: "us-east-1",
  httpOptions: {
    timeout: 300000 // Matching Lambda function timeout
  }
};
const s3 = new AWS.S3(awsOptions);
const archiver = require("archiver");
const stream = require("stream");
const request = require("request");

// take in a resolve function ----v
const streamTo = (bucket, key, resolve) => {
  var passthrough = new stream.PassThrough();
  s3.upload(
    {
      Bucket: bucket,
      Key: key,
      Body: passthrough,
      ContentType: "application/zip",
      ServerSideEncryption: "AES256"
    },
    (err, data) => {
      if (err) {
        console.error('Error while uploading zip')
        reject(err)
        return
      }
      console.log('Zip uploaded')
      // at this point, we know that the zip has for sure finished uploading and it is safe to resolve the promise
      resolve()
    }
    // add this if you want to see the progress of the upload
    // this is how I found out that the zip wasn't done uploading for sure before the lambda finished
    ).on("httpUploadProgress", progress => {
      console.log(progress)
    });
  return passthrough;
};

// Kudos to this person on GitHub for this getStream solution
// https://github.com/aws/aws-sdk-js/issues/2087#issuecomment-474722151
const getStream = (bucket, key) => {
  let streamCreated = false;
  const passThroughStream = new stream.PassThrough();

  passThroughStream.on("newListener", event => {
    if (!streamCreated && event == "data") {
      const s3Stream = s3
        .getObject({ Bucket: bucket, Key: key })
        .createReadStream();
      s3Stream
        .on("error", err => passThroughStream.emit("error", err))
        .pipe(passThroughStream);

      streamCreated = true;
    }
  });

  return passThroughStream;
};

exports.handler = async (event, context, callback) => {
  var bucket = event["bucket"];
  var destinationKey = event["destination_key"];
  var files = event["files"];

  await new Promise(async (resolve, reject) => {
    // pass the resolve function here -----------------v
    var zipStream = streamTo(bucket, destinationKey, resolve);
    // make sure these don't resolve the promise
    // zipStream.on("close", resolve);
    // zipStream.on("end", resolve);
    zipStream.on("error", reject);

    var archive = archiver("zip");
    archive.on("error", err => {
      throw new Error(err);
    });
    archive.pipe(zipStream);

    for (const file of files) {
      if (file["type"] == "file") {
      archive.append(getStream(bucket, file["uri"]), {
        name: file["filename"]
      });
      } else if (file["type"] == "url") {
        archive.append(request(file["uri"]), { name: file["filename"] });
      }
    }
    archive.finalize();
  }).catch(err => {
    throw new Error(err);
  });

  callback(null, {
    statusCode: 200,
    body: { final_destination: destinationKey }
  });
};

@amiantos
Copy link
Author

@RyanClementsHax thanks! while we weren't running into this issue in production (how? why?) I went ahead and implemented your changes on our side since they make sense to me, and I also updated the gist itself with your changes. Thanks again for the extra legwork on diagnosing this issue.

@RyanClementsHax
Copy link

@RyanClementsHax thanks! while we weren't running into this issue in production (how? why?) I went ahead and implemented your changes on our side since they make sense to me, and I also updated the gist itself with your changes. Thanks again for the extra legwork on diagnosing this issue.

no problem!

@DDynamic
Copy link

@amiantos with this script, do you think there is an easy way to introduce concurrent stream processing? It appears that the read streams are created and processed sequentially. I'm testing this out with zipping over 10,000 small files (~50 KB each).

@amiantos
Copy link
Author

@DDynamic I'm no expert but I assume you can't add multiple files to a zip file at the same time. I did a little googling, looks like this assumption is correct, the zip algorithm needs to process one stream at a time.

@RyanClementsHax
Copy link

RyanClementsHax commented Jul 6, 2021

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

@damianobertuna
Copy link

Hi,

we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:

{
"files": [
    {
      "fileName": "File1_1GB.bin",
      "key": "File1_1GB.bin"
    },
    {
      "fileName": "File2_1GB.bin",
      "key": "File2_1GB.bin"
    },
    {
      "fileName": "File3_1GB.bin",
      "key": "File3_1GB.bin"
    },
    {
      "fileName": "File4_1GB.bin",
      "key": "File4_1GB.bin"
    },
    {
      "fileName": "File5_1GB.bin",
      "key": "File5_1GB.bin"
    },
],
  "bucketRegion": "REGION_NAME",
  "originBucketName": "BUCKET_NAME",
  "destBucketName": "DESTBUCKET",
  "zipName": "ZippedFiles.zip"
}

In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:

  • File1 1GB;
  • File2 1GB;
  • File3 1GB;
  • File4 34KB;
  • File5 34KB;

Our configuration is 15 minutes the timeout and 10GB the memory.

What can be the problem?

Thanks in advance.

Reagards.

@RyanClementsHax
Copy link

RyanClementsHax commented Jul 30, 2021

Hi,

we tried with the solution suggested here but we are facing the following problem.
Suppose we want to zip these files:

{
"files": [
    {
      "fileName": "File1_1GB.bin",
      "key": "File1_1GB.bin"
    },
    {
      "fileName": "File2_1GB.bin",
      "key": "File2_1GB.bin"
    },
    {
      "fileName": "File3_1GB.bin",
      "key": "File3_1GB.bin"
    },
    {
      "fileName": "File4_1GB.bin",
      "key": "File4_1GB.bin"
    },
    {
      "fileName": "File5_1GB.bin",
      "key": "File5_1GB.bin"
    },
],
  "bucketRegion": "REGION_NAME",
  "originBucketName": "BUCKET_NAME",
  "destBucketName": "DESTBUCKET",
  "zipName": "ZippedFiles.zip"
}

In the ZippedFiles.zip created we have correctly 5 files but they are not of the correct size, like:

  • File1 1GB;
  • File2 1GB;
  • File3 1GB;
  • File4 34KB;
  • File5 34KB;

Our configuration is 15 minutes the timeout and 10GB the memory.

What can be the problem?

Thanks in advance.

Reagards.

Hello! Which version of the solution are you using? Also have you added additional logging to catch errors? Your type of problem happened to me whenever the stream was not finished writing for some reason normally caused when the server encountered an unexpected error. Moreover, how long was your lambda running for?

@Trigno97
Copy link

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

🏆

@dakebusi
Copy link

@damianobertuna I'm facing a similar issue, where not all files are correctly zipped.

Did you manage to fix it?

@mikehadlow
Copy link

Just wanted to leave a shoutout to @amiantos and @RyanClementsHax for his Typescript solution. Works a dream and saved me a ton of time. Thank you both!

@Limesior
Copy link

The project I am working on has hit a few edge cases and points of learning that I figured I would donate back to the community.

Performance

If anyone is wondering how well this performs on lambda, my team benchmarked it at being able to handle roughly 1Gig/min with little memory footprint, but only when the lambda is given as much CPU power as possible. For us, zipping performance scaled linearly with how much CPU we gave it.

Error handling

In a lambda (I'm pretty sure) if the lambda fails, the infrastructure takes care of the clean up and isolation between hander invocations. My team couldn't use lambda because we needed to zip files larger than we could zip in the timeout provided by lambda. We ended up building a small service to run perpetually on ECS that listened for zip requests to come in via SQS. This forced us to consider how errors were handled more because we didn't have the lambda infrastructure to help us out here.

Some of the problems we found when we inserted failures like references to objects that didn't exist, were unhandled promise rejections, which will cause node to crash in future node versions, and uncaught exceptions which already causes node to crash. This was mostly caused by streams not throwing errors within the call stack so they can be caught by promise rejections or the classic try/catch. The solution we found was to be proactive to handle any error coming from a stream and make sure they could be caught by the promise patterns we wanted to use. If you're use case allows you to stay on lambda, you don't need to worry about this too much, but you would gain a better ability to catch/handle/log errors should you do this. Example implementation below.

AWS SDK V3

The original version of this zipper lambda used the v2 version of the sdk which is slowly becoming out of date, so my team decided to rip off the band aid and upgrade to this version. The major version brought some api changes, but otherwise it remained the same. One of the things it brought was better typescript support which attracted my team. Example implementation below.

Typescript

The original version didn't use this, but my team really liked typescript and the static typing checks it gave us, which became especially helpful when type checking the object the lambda handled. Do know that the GetObjectCommand requires that you add dom to your lib array in your tsconfig.json because it references some browser types and thus need to include it if you want typescript to work properly for this command. Example implementation below.

Example implementation

As mentioned before, my team's use case forced us off of lambda, but I tried to re-lambda-ify the code to make it consistent with this discussion. Sorry if it doesn't compile, but the main concepts I mention in this post are all in it. Also as part of modifying the code to fit our team, we broke the code out into separate contained classes so we could more easily unit test it. Additionally, we changed the object shape slightly to fit our use case (removed the type key on the file object, renamed uri to key, and camelCased the fields). I also added a few comments explaining some of the complexities discussed previously. I'm open to any feedback y'all got for me! And a big thanks to @amiantos, and everyone else who contributed. My team couldn't have done this without your work! Also if anyone has any questions on how my team implemented that SQS consumer, feel free to reach out to me.

// handler.ts
import { S3Client } from '@aws-sdk/client-s3'
import { S3Service } from './s3Service'
import archiver from 'archiver'
import type { Archiver } from 'archiver'

const s3 = new S3Client({})
const s3Service = new S3Service(s3)

interface File {
    fileName: string
    key: string
}

interface ZipEvent {
    bucket: string
    destinationKey: string
    files: File[]
}

const finalizeArchiveSafely = (archive: Archiver): Promise<void> => {
    return new Promise((resolve, reject) => {
        // if we dont reject on error, the archive.finalize() promise will resolve normally
        // and the error will go unchecked causing the application to crash
        archive.on('error', reject)
        archive.finalize().then(resolve).catch(reject)
    })
}

export default async (event: ZipEvent) => {
    const archive = archiver('zip')

    try {
        for (const file of event.files) {
            const downloadStream = s3Service.createLazyDownloadStreamFrom(event.bucket, file.key)
            archive.append(
                downloadStream,
                {
                    name: file.fileName
                }
            )
        }

        // for whatever reason, if we try to await the promises individually, we get problems with trying to handle the errors produced by them
        // for example, if we tried to await them individually and injected some error like told it to zip an object that didn't exist, both statements would throw
        // one would be caught by the try catch and the other would be considered an unhandled promise rejection (bizarre, I know, but I kid you not)
        // using Promise.all was the only solution that seemed to solve the issue for us
        await Promise.all([
            finalizeArchiveSafely(archive),
            s3Service.uploadTo(event.bucket, event.destinationKey, archive)
        ])
    // with the robustness added, all errors will be caught by this catch block
    // so no need to worry about unhandled promises or unhandled exceptions
    } catch (e) {
        // this makes sure that the archive stops archiving if there is an error
        archive.abort()
        throw e
    }
}
// s3Service.ts
import { PassThrough } from 'stream'
import { GetObjectCommand } from '@aws-sdk/client-s3'
import { Upload } from '@aws-sdk/lib-storage'

import type { S3Client } from '@aws-sdk/client-s3'
import type { Readable } from 'stream'

export class S3Service {
    constructor(private s3: S3Client) {}

    // we need to lazy load the streams because archiver only works on one at a time
    // if we create a stream to an object in s3, the connection will time out when no traffic is going over it
    // this will be the case if multiple streams are opened at the same time and one of them is for a very large object
    public createLazyDownloadStreamFrom(bucket: string, key: string): Readable {
        let streamCreated = false
        // create a dummy stream to pass on
        const stream = new PassThrough()
        // only when someone first connects to this stream do we fetch the object and feed the data through the dummy stream
        stream.on('newListener', async event => {
            if (!streamCreated && event == 'data') {
                await this.initDownloadStream(bucket, key, stream)
                streamCreated = true
            }
        })

        return stream
    }

    public async uploadTo(
        bucket: string,
        key: string,
        stream: Readable
    ): Promise<void> {
        const upload = new Upload({
            client: this.s3,
            params: {
                Bucket: bucket,
                Key: key,
                // we pipe to a passthrough to handle the case that the stream isn't initialized yet
                Body: stream.pipe(new PassThrough()),
                ContentType: 'application/zip'
            }
        })
        await upload.done()
    }


    // if we don't pipe the errors into the stream, we will get unhandled exception errors in the console and won't be caught by any try/catch or .catch constructs that call createLazyDownloadStreamFrom since this initDownloadStream function is called in the callback function of .on('newListener') and thus isn't in the "call stack" of the call to createLazyDownloadStreamFrom
    // for example, it is totally reasonable that the s3 object asked for doesn't exist
    // in which case s3.send(new GetObjectCommand(/* */)) throws
    private async initDownloadStream(
        bucket: string,
        key: string,
        stream: PassThrough
    ) {
        try {
            const { Body: body } = await this.s3.send(
                new GetObjectCommand({ Bucket: bucket, Key: key })
            )
            // we need to type narrow here since Body can be one of many things
            if (!body) {
                stream.emit(
                    'error',
                    new Error(
                        `got an undefined body from s3 when getting object ${bucket}/${key}`
                    )
                )
            } else if (!('on' in body)) {
                stream.emit(
                    'error',
                    new Error(
                        `got a ReadableStream<any> (a stream used by browser fetch) or Blob from s3 when getting object ${bucket}/${key} instead of Readable`
                    )
                )
            } else {
                body.on('error', err => stream.emit('error', err)).pipe(stream)
            }
        } catch (e) {
            stream.emit('error', e)
        }
    }
}

Thank you so much ! I had an issue where my lambda would die on larger/multiple files without throwing any errors with Archiver and SDK v3, this was the fix. Great work !

@amiantos
Copy link
Author

Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!

@julianpoma
Copy link

I was struggling with this today. Glad to know that I am not alone <3

@Sahar-SE
Copy link

Sahar-SE commented Oct 9, 2023

Hi, I need someone to help me please. I have created a Lambda function to trigger S3 bucket and zip the uploaded files then store them in a destination bucket. but when I upload files it doesn't appear to destination bucket. I have searched a lot but couldn't find any solution for that. I configured all the permission and IAM but still it doesn't work.

@pnicholls
Copy link

pnicholls commented Apr 23, 2024

zipkit.io is another way to solve this problem.

@marc-reed
Copy link

Really happy to see how this gist has grown and evolved over the years. Kudos to everyone who has weighed in, offered more code and advice!

I am going to join in and sing praises for @amiantos - you saved my butt today!
I had to write a Lambda to zip thousands of files. The task would just go dark - no zip, no errors. Your V3 script did the trick - well done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment