Skip to content

Instantly share code, notes, and snippets.

@mshakhomirov
Created November 28, 2021 15:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mshakhomirov/5b935158985446b9b9bbd56edeff29eb to your computer and use it in GitHub Desktop.
Save mshakhomirov/5b935158985446b9b9bbd56edeff29eb to your computer and use it in GitHub Desktop.
...
else if (output === 's3Stream') { // as one file, node.js streaming, save as new line delimited JSON.
const uploadStream = ({ Bucket, Key }) => {
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
};
const { writeStream, promise } = uploadStream({ Bucket: bucket, Key: s3key });
pr(`saving to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`);
s1.stream({ highWaterMark: BATCH_SIZE }) // stream() is just to wrap into pipeable object, and not to enable 'result' events, they are emitted anyway.
// eslint-disable-next-line func-names
.pipe(through2.obj(function(row, enc, next) {
this.push(`${JSON.stringify(row)}\n`);
next();
},
))
.pipe(writeStream)
.on('close', () => {
console.log('upload finished');
});
promise.then(() => {
console.log('upload completed successfully');
resolve(`saved to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`);
}).catch((err) => {
console.log('upload failed.', err.message);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment