Skip to content

Instantly share code, notes, and snippets.

@hboylan
Last active January 6, 2023 18:38
Show Gist options
  • Save hboylan/68ad1bea3e603b33e338c39bbd8c72d3 to your computer and use it in GitHub Desktop.
Save hboylan/68ad1bea3e603b33e338c39bbd8c72d3 to your computer and use it in GitHub Desktop.
AWS Lambda function to read and write S3 files by line to perform efficient processing
const stream = require('stream')
const readline = require('readline')
const AWS = require('aws-sdk')
const S3 = new AWS.S3()
// read S3 file by line
function createReadline(Bucket, Key) {
// s3 read stream
const input = S3
.getObject({
Bucket,
Key
})
.createReadStream()
// node readline with stream
return readline
.createInterface({
input,
terminal: false
})
}
// write S3 file
function createWriteStream(Bucket, Key) {
const writeStream = new stream.PassThrough()
const uploadPromise = S3
.upload({
Bucket,
Key,
Body: writeStream
})
.promise()
return { writeStream, uploadPromise }
}
// perform processing on line
function processLine(line) {
// do something
return line
}
// event.inputBucket: source file bucket
// event.inputKey: source file key
// event.outputBucket: target file bucket
// event.outputKey: target file key
// event.limit: maximum number of lines to read
exports.handler = function execute(event, context, callback) {
console.log(JSON.stringify(event, null, 2))
var totalLineCount = 0
// create input stream from S3
const readStream = createReadline(event.inputBucket, event.inputKey)
// create output stream to S3
const { writeStream, uploadPromise } = createWriteStream(event.outputBucket, event.outputKey)
// read each line
readStream.on('line', line => {
// close stream on limit
if (event.limit && event.limit <= totalLineCount) {
return readStream.close()
}
// process line
else {
line = processLine(line)
writeStream.write(`${line}\n`)
}
totalLineCount++
})
// clean up on close
readStream.on('close', async () => {
// end write stream
writeStream.end()
// wait for upload
const uploadResponse = await uploadPromise
// return processing insights
callback(null, {
totalLineCount,
uploadResponse
})
})
}
@non-senses
Copy link

Line 80; shouldn't it be writeStream.close()?

@non-senses
Copy link

Line 80; shouldn't it be writeStream.close()?

Confirmed, it should indeed be .end(). My bad.

@vignesh-uj
Copy link

Will this work when the lambda function's timeout is very less?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment