Skip to content

Instantly share code, notes, and snippets.

@AaronHarris
Last active April 10, 2021 04:29
Show Gist options
  • Save AaronHarris/ce9ccffb902b493d9d1e3aa2db9c969e to your computer and use it in GitHub Desktop.
Save AaronHarris/ce9ccffb902b493d9d1e3aa2db9c969e to your computer and use it in GitHub Desktop.
Node's Pipeline Bug
import { pipeline, PassThrough, TransformCallback, Transform } from 'stream/promises';
import { createGunzip } from 'zlib';
import * as csv from 'csv-parser';
import * as S3 from 'aws-sdk/clients/s3';
const s3client = new S3({ maxRetries: 15, httpOptions: { timeout: 15 * 60_000 } });
async function main(bucketName, dataKey) {
const s3Stream = s3client.getObject({ Bucket: bucketName, Key: dataKey}).createReadStream()
.on('error', err => { console.error('S3 read error:', err); }); // errors from S3 service
const unzipStream = createGunzip()
.on('error', err => { console.error('Unzip error:', err); }) // error unzipping file chuink
const csvStream = csv({}); // you get the idea
const myTransformStream = new Transform({
objectMode: true,
transform(record, enc, cb) {
// Makes the record consumable for ddb
cb(null, record);
}
});
try {
await pipelineAsync(
s3Stream,
unzipStream,
csvStream,
myTransformStream,
dynamoDBWriteStream,
new PassThrough({ objectMode: true })
);
} catch (error) {
console.error(`[Exiting] Caught error processing pipeline for "${dataKey}":`, error);
throw error;
}
console.log(`[Complete] Finished processing pipeline for "${dataKey}"`);
}
main('myBucket', 'myKey');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment