Created
November 28, 2021 15:27
-
-
Save mshakhomirov/5b935158985446b9b9bbd56edeff29eb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
else if (output === 's3Stream') { // as one file, node.js streaming, save as new line delimited JSON. | |
const uploadStream = ({ Bucket, Key }) => { | |
const pass = new stream.PassThrough(); | |
return { | |
writeStream: pass, | |
promise: s3.upload({ Bucket, Key, Body: pass }).promise(), | |
}; | |
}; | |
const { writeStream, promise } = uploadStream({ Bucket: bucket, Key: s3key }); | |
pr(`saving to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`); | |
s1.stream({ highWaterMark: BATCH_SIZE }) // stream() is just to wrap into pipeable object, and not to enable 'result' events, they are emitted anyway. | |
// eslint-disable-next-line func-names | |
.pipe(through2.obj(function(row, enc, next) { | |
this.push(`${JSON.stringify(row)}\n`); | |
next(); | |
}, | |
)) | |
.pipe(writeStream) | |
.on('close', () => { | |
console.log('upload finished'); | |
}); | |
promise.then(() => { | |
console.log('upload completed successfully'); | |
resolve(`saved to aws s3 cp s3://${bucket}/${s3key} ./tmp/${s3key}`); | |
}).catch((err) => { | |
console.log('upload failed.', err.message); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment