Skip to content

Instantly share code, notes, and snippets.

@johndstein
Created October 7, 2019 15:11
Show Gist options
  • Save johndstein/22e34f7bef585debf03dd609a72a2090 to your computer and use it in GitHub Desktop.
Save johndstein/22e34f7bef585debf03dd609a72a2090 to your computer and use it in GitHub Desktop.
S3 JSON line Reader (slow)
#!/usr/bin/env node
'use strict';
const AWS = require('aws-sdk');
if (process.env.AWS_PROFILE) { // eslint-disable-line
AWS.config.credentials = new AWS.SharedIniFileCredentials({
profile: process.env.AWS_PROFILE // eslint-disable-line
});
}
const s3opts = { apiVersion: '2006-03-01' };
if (process.env.AWS_S3_ENDPOINT) { // eslint-disable-line
s3opts.endpoint = process.env.AWS_S3_ENDPOINT; // eslint-disable-line
s3opts.s3ForcePathStyle = true;
s3opts.signatureVersion = 'v4';
}
const s3 = new AWS.S3(s3opts);
const gunzipA = require('util').promisify(require('zlib').gunzip);
class S3JsonlReader extends require('stream').Readable {
constructor(options) {
options.objectMode = true;
super(options);
this.keys = options.keys;
this.bucket = options.bucket;
this.bucketKey = null;
this.errors = [];
this.events = [];
}
_read() {
if (this.events.length) {
this.push(this.events.splice(0, 1)[0]);
}
else {
this.getEventsFromS3()
.then(() => {
if (this.events.length) {
this.push(this.events.splice(0, 1)[0]);
}
else {
this.push(null);
}
});
}
}
parseKey(Key, Bucket) {
if (Key.startsWith('s3://')) {
const a = Key.split('/');
Bucket = a[2];
Key = a.slice(3).join('/');
}
return { Bucket, Key };
}
async getEventsFromS3() {
try {
if (this.keys.length) {
this.bucketKey =
this.parseKey(this.keys.splice(0, 1)[0], this.bucket);
let object = await s3.getObject(this.bucketKey).promise();
object = await gunzipA(object.Body);
this.events = object
.toString()
.split('\n')
.filter((l) => l.trim().length > 6)
.map((l) => JSON.parse(l));
}
}
catch (error) {
this.errors.push({
message: error.message,
bucketKey: this.bucketKey,
stack: error.stack
});
setTimeout(async () => {
await this.getEventsFromS3();
}, 5);
}
}
}
exports = module.exports = S3JsonlReader;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment