Created
July 16, 2024 14:30
-
-
Save sascha/36ea12cb1282b7ca7ca53babb4192d83 to your computer and use it in GitHub Desktop.
Cloud Function to load Stripe Data Pipeline Paquet files from Google Cloud Storage into Google BigQuery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const functions = require('@google-cloud/functions-framework'); | |
const { Storage } = require('@google-cloud/storage'); | |
const { BigQuery } = require('@google-cloud/bigquery'); | |
const path = require('path'); | |
const PROJECT_ID = 'YOUR_GOOGLE_PROJECT_ID'; | |
const BUCKET_NAME = 'YOUR_GCS_BUCKET_FOR_STRIPE'; | |
const DATASET_LIVE = 'YOUR_BIGQUERY_DATASET_FOR_LIVE_DATA'; | |
const DATASET_TEST = 'YOUR_BIGQUERY_DATASET_FOR_TEST_DATA'; | |
const storage = new Storage({projectId: PROJECT_ID}); | |
const bigquery = new BigQuery({projectId: PROJECT_ID}); | |
// Returns the name of the latest live or test data directory | |
async function getLatestFolder(mode) { | |
const [files] = await storage.bucket(BUCKET_NAME).getFiles(); | |
const folders = files | |
// Only get the folders for the specified mode | |
.filter((file) => file.name.includes(mode)) | |
// Only get folders that are done syncing | |
.filter((file) => file.name.includes("coreapi_SUCCESS")) | |
// Normalize the paths | |
.map((file) => path.normalize(file.name)) | |
// Get the folder names only | |
.map((filePath) => path.dirname(filePath)) | |
// Get the first part of the folder name (the date part) | |
.map((dir) => dir.split("/")[0]) | |
// Sort the folders in descending order | |
.sort((a, b) => b.localeCompare(a)); | |
return folders.length > 0 ? [folders[0], mode].join("/") : null; | |
} | |
// Loads files from the given folder into a BigQuery dataset | |
async function loadParquetToBigQuery(folder, datasetId) { | |
console.log(`Loading data from ${folder} into ${datasetId}`); | |
const [files] = await storage | |
.bucket(BUCKET_NAME) | |
.getFiles({ prefix: folder }); | |
const tableFiles = files | |
// Ignore 'coreapi_SUCCESS' file | |
.filter((file) => !file.name.includes("coreapi_SUCCESS")) | |
.reduce((acc, file) => { | |
const tableName = file.name.split("/")[2]; | |
if (!acc[tableName]) { | |
acc[tableName] = []; | |
} | |
acc[tableName].push(storage.bucket(BUCKET_NAME).file(file.name)); | |
return acc; | |
}, {}); | |
let errors = []; | |
for (const [tableName, fileList] of Object.entries(tableFiles)) { | |
const metadata = { | |
sourceFormat: "PARQUET", | |
location: "EU", | |
writeDisposition: "WRITE_TRUNCATE", | |
}; | |
const responses = await bigquery | |
.dataset(datasetId) | |
.table(tableName) | |
.load(fileList, metadata); | |
// Check the job's status for errors | |
responses.forEach((response) => { | |
if (response.status?.errors) | |
errors = errors.concat(response.status.errors); | |
}); | |
if (errors.length > 0) { | |
console.error( | |
`Failed to load ${fileList.length} files into ${datasetId}.${tableName}` | |
); | |
console.error(errors); | |
} else { | |
console.log( | |
`Loaded ${fileList.length} files into ${datasetId}.${tableName}` | |
); | |
} | |
} | |
return errors; | |
} | |
// Run when cloud event is triggered | |
functions.cloudEvent('loadStripeData', async cloudEvent => { | |
const latestLiveFolder = await getLatestFolder("livemode"); | |
const latestTestFolder = await getLatestFolder("testmode"); | |
if (latestLiveFolder) { | |
const errors = await loadParquetToBigQuery(latestLiveFolder, DATASET_LIVE); | |
if (errors.length == 0) { | |
// Remove files from Google Cloud Storage | |
await storage | |
.bucket(BUCKET_NAME) | |
.deleteFiles({ prefix: latestLiveFolder }); | |
console.log("Deleted files from Google Cloud Storage"); | |
} | |
} else { | |
console.log("No new live data found"); | |
} | |
if (latestTestFolder) { | |
const errors = await loadParquetToBigQuery(latestTestFolder, DATASET_TEST); | |
if (errors.length == 0) { | |
// Remove files from Google Cloud Storage | |
await storage | |
.bucket(BUCKET_NAME) | |
.deleteFiles({ prefix: latestTestFolder }); | |
console.log("Deleted files from Google Cloud Storage"); | |
} | |
} else { | |
console.log("No new test data found"); | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment