Skip to content

Instantly share code, notes, and snippets.

@rgpower
Created October 18, 2020 01:07
Show Gist options
  • Save rgpower/674907cda6c611ba5bff8de84573459c to your computer and use it in GitHub Desktop.
Save rgpower/674907cda6c611ba5bff8de84573459c to your computer and use it in GitHub Desktop.
Firehose S3 to BigQuery Import, with NDSON per-row processing
"use strict";
const GCP_PROJECT = process.env["GCP_PROJECT"];
const BQ_DATASET = process.env["BQ_DATASET"];
const BQ_TABLE = process.env["BQ_TABLE"];
const S3_BUCKET = process.env["S3_BUCKET"];
const S3_PREFIX = process.env["S3_PREFIX"];
const GCP_CREDENTIALS = process.env["GCP_CREDENTIALS"];
if (!GCP_PROJECT) {
throw new Error("GCP_PROJECT envar not defined");
}
if (!S3_BUCKET) {
throw new Error("S3_BUCKET envar not defined");
}
if (!S3_PREFIX) {
throw new Error("S3_PREFIX envar not defined");
}
if (!GCP_CREDENTIALS) {
throw new Error("GCP_CREDENTIALS envar not defined");
}
const { PassThrough, Transform } = require("stream");
const AWS = require("aws-sdk");
const { BigQuery } = require("@google-cloud/bigquery");
const zlib = require("zlib");
const S3 = new AWS.S3();
const BQ = new BigQuery({
projectId: GCP_PROJECT,
credentials: JSON.parse(GCP_CREDENTIALS)
});
exports.handler = (event, context, callback) => {
let rec = event.Records.shift();
let bucketName = rec.s3.bucket.name;
let objectKey = rec.s3.object.key;
console.log(`bucketName: ${bucketName}`);
console.log(`objectKey: ${objectKey}`);
if (S3_BUCKET !== bucketName) return;
if (!objectKey.startsWith(S3_PREFIX)) return;
const ds = BQ.dataset(BQ_DATASET);
const projectActivityTable = ds.table(BQ_TABLE);
const dataStream = new PassThrough();
dataStream
.pipe(buildTransform())
.pipe(projectActivityTable.createWriteStream("json"))
.on("complete", job => {
const result = {
selfLink: job["metadata"]["selfLink"],
statistics: job["metadata"]["statistics"]
};
callback(null, result);
console.log(JSON.stringify(result));
})
.on("error", error => callback(error));
S3.getObject({ Bucket: bucketName, Key: objectKey })
.createReadStream()
.pipe(zlib.createUnzip())
.pipe(dataStream);
};
function formatItem(item) {
// could return '' here, which would cause this item to be filtered out
if ('account' in item && 'addons' in item['account']) {
delete item['account']['addons'];
}
return JSON.stringify(item);
}
function buildTransform() {
let line = '';
return new Transform({
transform: (data, encoding, cb) => {
const str = line + data.toString('utf-8');
const segs = str.split("\n");
line='';
const parsed = [];
segs.forEach(seg => {
try {
const item = JSON.parse(seg);
parsed.push(formatItem(item));
} catch (err) {
line += seg;
}
});
const payload = parsed.join("\n");
if (payload) {
cb(null, Buffer.from(payload + "\n", 'utf-8'));
} else {
cb();
}
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment