Skip to content

Instantly share code, notes, and snippets.

@sascha
Created July 16, 2024 14:30
Show Gist options
  • Save sascha/36ea12cb1282b7ca7ca53babb4192d83 to your computer and use it in GitHub Desktop.
Save sascha/36ea12cb1282b7ca7ca53babb4192d83 to your computer and use it in GitHub Desktop.
Cloud Function to load Stripe Data Pipeline Paquet files from Google Cloud Storage into Google BigQuery
const functions = require('@google-cloud/functions-framework');
const { Storage } = require('@google-cloud/storage');
const { BigQuery } = require('@google-cloud/bigquery');
const path = require('path');
const PROJECT_ID = 'YOUR_GOOGLE_PROJECT_ID';
const BUCKET_NAME = 'YOUR_GCS_BUCKET_FOR_STRIPE';
const DATASET_LIVE = 'YOUR_BIGQUERY_DATASET_FOR_LIVE_DATA';
const DATASET_TEST = 'YOUR_BIGQUERY_DATASET_FOR_TEST_DATA';
const storage = new Storage({projectId: PROJECT_ID});
const bigquery = new BigQuery({projectId: PROJECT_ID});
// Returns the name of the latest live or test data directory
async function getLatestFolder(mode) {
const [files] = await storage.bucket(BUCKET_NAME).getFiles();
const folders = files
// Only get the folders for the specified mode
.filter((file) => file.name.includes(mode))
// Only get folders that are done syncing
.filter((file) => file.name.includes("coreapi_SUCCESS"))
// Normalize the paths
.map((file) => path.normalize(file.name))
// Get the folder names only
.map((filePath) => path.dirname(filePath))
// Get the first part of the folder name (the date part)
.map((dir) => dir.split("/")[0])
// Sort the folders in descending order
.sort((a, b) => b.localeCompare(a));
return folders.length > 0 ? [folders[0], mode].join("/") : null;
}
// Loads files from the given folder into a BigQuery dataset
async function loadParquetToBigQuery(folder, datasetId) {
console.log(`Loading data from ${folder} into ${datasetId}`);
const [files] = await storage
.bucket(BUCKET_NAME)
.getFiles({ prefix: folder });
const tableFiles = files
// Ignore 'coreapi_SUCCESS' file
.filter((file) => !file.name.includes("coreapi_SUCCESS"))
.reduce((acc, file) => {
const tableName = file.name.split("/")[2];
if (!acc[tableName]) {
acc[tableName] = [];
}
acc[tableName].push(storage.bucket(BUCKET_NAME).file(file.name));
return acc;
}, {});
let errors = [];
for (const [tableName, fileList] of Object.entries(tableFiles)) {
const metadata = {
sourceFormat: "PARQUET",
location: "EU",
writeDisposition: "WRITE_TRUNCATE",
};
const responses = await bigquery
.dataset(datasetId)
.table(tableName)
.load(fileList, metadata);
// Check the job's status for errors
responses.forEach((response) => {
if (response.status?.errors)
errors = errors.concat(response.status.errors);
});
if (errors.length > 0) {
console.error(
`Failed to load ${fileList.length} files into ${datasetId}.${tableName}`
);
console.error(errors);
} else {
console.log(
`Loaded ${fileList.length} files into ${datasetId}.${tableName}`
);
}
}
return errors;
}
// Run when cloud event is triggered
functions.cloudEvent('loadStripeData', async cloudEvent => {
const latestLiveFolder = await getLatestFolder("livemode");
const latestTestFolder = await getLatestFolder("testmode");
if (latestLiveFolder) {
const errors = await loadParquetToBigQuery(latestLiveFolder, DATASET_LIVE);
if (errors.length == 0) {
// Remove files from Google Cloud Storage
await storage
.bucket(BUCKET_NAME)
.deleteFiles({ prefix: latestLiveFolder });
console.log("Deleted files from Google Cloud Storage");
}
} else {
console.log("No new live data found");
}
if (latestTestFolder) {
const errors = await loadParquetToBigQuery(latestTestFolder, DATASET_TEST);
if (errors.length == 0) {
// Remove files from Google Cloud Storage
await storage
.bucket(BUCKET_NAME)
.deleteFiles({ prefix: latestTestFolder });
console.log("Deleted files from Google Cloud Storage");
}
} else {
console.log("No new test data found");
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment