Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chris-ramon/6f0de886723bf200076e8bf673fcb18e to your computer and use it in GitHub Desktop.
Save chris-ramon/6f0de886723bf200076e8bf673fcb18e to your computer and use it in GitHub Desktop.
AWS Lambda function to read and write S3 files by line to perform efficient processing
const stream = require('stream')
const readline = require('readline')
const AWS = require('aws-sdk')
const S3 = new AWS.S3()
// read S3 file by line
function createReadline(Bucket, Key) {
// s3 read stream
const input = S3
.getObject({
Bucket,
Key
})
.createReadStream()
// node readline with stream
return readline
.createInterface({
input,
terminal: false
})
}
// write S3 file
function createWriteStream(Bucket, Key) {
const writeStream = new stream.PassThrough()
const uploadPromise = S3
.upload({
Bucket,
Key,
Body: writeStream
})
.promise()
return { writeStream, uploadPromise }
}
// perform processing on line
function processLine(line) {
// do something
return line
}
// event.inputBucket: source file bucket
// event.inputKey: source file key
// event.outputBucket: target file bucket
// event.outputKey: target file key
// event.limit: maximum number of lines to read
exports.handler = function execute(event, context, callback) {
console.log(JSON.stringify(event, null, 2))
var totalLineCount = 0
// create input stream from S3
const readStream = createReadline(event.inputBucket, event.inputKey)
// create output stream to S3
const { writeStream, uploadPromise } = createWriteStream(event.outputBucket, event.outputKey)
// read each line
readStream.on('line', line => {
// close stream on limit
if (event.limit && event.limit <= totalLineCount) {
return readStream.close()
}
// process line
else {
line = processLine(line)
writeStream.write(`${line}\n`)
}
totalLineCount++
})
// clean up on close
readStream.on('close', async () => {
// end write stream
writeStream.end()
// wait for upload
const uploadResponse = await uploadPromise
// return processing insights
callback(null, {
totalLineCount,
uploadResponse
})
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment