Created
October 18, 2020 01:07
-
-
Save rgpower/674907cda6c611ba5bff8de84573459c to your computer and use it in GitHub Desktop.
Firehose S3 to BigQuery Import, with NDSON per-row processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const GCP_PROJECT = process.env["GCP_PROJECT"]; | |
const BQ_DATASET = process.env["BQ_DATASET"]; | |
const BQ_TABLE = process.env["BQ_TABLE"]; | |
const S3_BUCKET = process.env["S3_BUCKET"]; | |
const S3_PREFIX = process.env["S3_PREFIX"]; | |
const GCP_CREDENTIALS = process.env["GCP_CREDENTIALS"]; | |
if (!GCP_PROJECT) { | |
throw new Error("GCP_PROJECT envar not defined"); | |
} | |
if (!S3_BUCKET) { | |
throw new Error("S3_BUCKET envar not defined"); | |
} | |
if (!S3_PREFIX) { | |
throw new Error("S3_PREFIX envar not defined"); | |
} | |
if (!GCP_CREDENTIALS) { | |
throw new Error("GCP_CREDENTIALS envar not defined"); | |
} | |
const { PassThrough, Transform } = require("stream"); | |
const AWS = require("aws-sdk"); | |
const { BigQuery } = require("@google-cloud/bigquery"); | |
const zlib = require("zlib"); | |
const S3 = new AWS.S3(); | |
const BQ = new BigQuery({ | |
projectId: GCP_PROJECT, | |
credentials: JSON.parse(GCP_CREDENTIALS) | |
}); | |
exports.handler = (event, context, callback) => { | |
let rec = event.Records.shift(); | |
let bucketName = rec.s3.bucket.name; | |
let objectKey = rec.s3.object.key; | |
console.log(`bucketName: ${bucketName}`); | |
console.log(`objectKey: ${objectKey}`); | |
if (S3_BUCKET !== bucketName) return; | |
if (!objectKey.startsWith(S3_PREFIX)) return; | |
const ds = BQ.dataset(BQ_DATASET); | |
const projectActivityTable = ds.table(BQ_TABLE); | |
const dataStream = new PassThrough(); | |
dataStream | |
.pipe(buildTransform()) | |
.pipe(projectActivityTable.createWriteStream("json")) | |
.on("complete", job => { | |
const result = { | |
selfLink: job["metadata"]["selfLink"], | |
statistics: job["metadata"]["statistics"] | |
}; | |
callback(null, result); | |
console.log(JSON.stringify(result)); | |
}) | |
.on("error", error => callback(error)); | |
S3.getObject({ Bucket: bucketName, Key: objectKey }) | |
.createReadStream() | |
.pipe(zlib.createUnzip()) | |
.pipe(dataStream); | |
}; | |
function formatItem(item) { | |
// could return '' here, which would cause this item to be filtered out | |
if ('account' in item && 'addons' in item['account']) { | |
delete item['account']['addons']; | |
} | |
return JSON.stringify(item); | |
} | |
function buildTransform() { | |
let line = ''; | |
return new Transform({ | |
transform: (data, encoding, cb) => { | |
const str = line + data.toString('utf-8'); | |
const segs = str.split("\n"); | |
line=''; | |
const parsed = []; | |
segs.forEach(seg => { | |
try { | |
const item = JSON.parse(seg); | |
parsed.push(formatItem(item)); | |
} catch (err) { | |
line += seg; | |
} | |
}); | |
const payload = parsed.join("\n"); | |
if (payload) { | |
cb(null, Buffer.from(payload + "\n", 'utf-8')); | |
} else { | |
cb(); | |
} | |
} | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment