Skip to content

Instantly share code, notes, and snippets.

@cmaggiulli
Last active December 2, 2023 19:52
Show Gist options
  • Save cmaggiulli/37b052682d864e7faf4f4dfbe9459b50 to your computer and use it in GitHub Desktop.
Save cmaggiulli/37b052682d864e7faf4f4dfbe9459b50 to your computer and use it in GitHub Desktop.
A groovy script that will dump all Google BigQuery data to JSON files on the system local to the process
/**
* Export and download data from Google BigQuery to Google Cloud Storage.
*
* @param credentialsPath The path to the JSON key file for Google Cloud authentication.
* @param gcsBucket The Google Cloud Storage bucket name.
* @param projectId The Google Big Query projectId.
* @param destinationPath The path for the resulting JSON files.
*
* @author Chris Maggiulli (cmaggiulli@gmail.com)
*/
def exportAndDownloadFromBigQueryToGCS(credentialsPath, gcsBucket, projectId, destinationPath) {
println("Script started at: " + new Date())
// Load credentials from JSON key file.
GoogleCredentials credentials
File credentialsFile = new File(credentialsPath)
try (FileInputStream serviceAccountStream = new FileInputStream(credentialsFile)) {
credentials = ServiceAccountCredentials.fromStream(serviceAccountStream)
}
// Instantiate BigQuery client.
BigQuery bigquery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(projectId)
.build()
.getService()
// Instantiate Storage client.
Storage storage = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(projectId)
.build()
.getService()
def format = "NEWLINE_DELIMITED_JSON"
// Iterate through datasets in BigQuery.
for (Dataset dataset : bigquery.listDatasets().iterateAll()) {
println("Processing Dataset: " + dataset.getDatasetId().getDataset())
// Iterate through tables in the dataset.
def tables = dataset.list()
for (Table table : tables.iterateAll()) {
try {
def tableName = table.getTableId().getTable()
println("Processing Table: " + tableName)
// Generate GCS URL for export.
def gcsUrl = "gs://" + gcsBucket + "/" + tableName + ".json"
// Export to GCS.
job = table.extract(format, gcsUrl)
try {
completedJob = job.waitFor(RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
RetryOption.totalTimeout(Duration.ofMinutes(3)))
// Check export status.
if (completedJob != null && completedJob.getStatus().getError() == null) {
println("Table " + tableName + " successfully exported to GCS")
} else {
println("Table " + tableName + " failed to export to GCS with error: " + completedJob.getStatus().getError())
continue // Skip the download if export fails
}
} catch (InterruptedException e) {
println("Error waiting for job completion: " + e)
continue // Skip the download if export fails
}
// Download from GCS.
objectName = table.getTableId().getTable() + ".json"
println("Downloading " + objectName + " from GCS to local directory")
// Retrieve blob from GCS.
blob = storage.get(BlobId.of(gcsBucket, objectName))
newFilePath = Paths.get(destinationPath + objectName)
tempFileTo = Files.createFile(newFilePath)
// Download blob to local file.
blob.downloadTo(tempFileTo)
println("Finished downloading " + objectName)
} catch (e) {
println("Failed to process table " + table.getTableId().getTable() + ": " + e)
}
}
}
println("Script completed at: " + new Date())
println("Script runtime: " + (System.currentTimeMillis() - startTime) + " milliseconds")
}
// Example usage with externalized credentials path, GCS bucket, project ID, and destination path.
def credentialsPath = "./fake-credentials.json"
def gcsBucket = "fake-bucket-name"
def projectId = "fakeprojectid-131113"
def destinationPath = "./"
def startTime = System.currentTimeMillis()
exportAndDownloadFromBigQueryToGCS(credentialsPath, gcsBucket, projectId, destinationPath)
pipeline {
agent any
environment {
CREDENTIALS_PATH = './fake-credentials.json'
GCS_BUCKET = 'fake-bucket-name'
PROJECT_ID = 'fakeprojectid-131113'
DESTINATION_PATH = './'
}
stages {
stage('Execute Script') {
steps {
script {
def startTime = System.currentTimeMillis()
echo "Script started at: ${currentBuild.startTimeInMillis}"
// Calling the externalized function
exportAndDownloadFromBigQueryToGCS(CREDENTIALS_PATH, GCS_BUCKET, PROJECT_ID, DESTINATION_PATH)
echo "Script completed at: ${currentBuild.startTimeInMillis}"
echo "Script runtime: ${System.currentTimeMillis() - startTime} milliseconds"
}
}
}
}
}
def exportAndDownloadFromBigQueryToGCS(credentialsPath, gcsBucket, projectId, destinationPath) {
// The actual script content (excluding imports and initial print statements)
GoogleCredentials credentials
File credentialsFile = new File(credentialsPath)
try (FileInputStream serviceAccountStream = new FileInputStream(credentialsFile)) {
credentials = ServiceAccountCredentials.fromStream(serviceAccountStream)
}
BigQuery bigquery = BigQueryOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(projectId)
.build()
.getService()
Storage storage = StorageOptions.newBuilder()
.setCredentials(credentials)
.setProjectId(projectId)
.build()
.getService()
def format = "NEWLINE_DELIMITED_JSON"
for (Dataset dataset : bigquery.listDatasets().iterateAll()) {
echo "Processing Dataset: ${dataset.getDatasetId().getDataset()}"
def tables = dataset.list()
for (Table table : tables.iterateAll()) {
try {
def tableName = table.getTableId().getTable()
echo "Processing Table: ${tableName}"
def gcsUrl = "gs://${gcsBucket}/${tableName}.json"
job = table.extract(format, gcsUrl)
try {
completedJob = job.waitFor(RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
RetryOption.totalTimeout(Duration.ofMinutes(3)))
if (completedJob != null && completedJob.getStatus().getError() == null) {
echo "Table ${tableName} successfully exported to GCS"
} else {
echo "Table ${tableName} failed to export to GCS with error: ${completedJob.getStatus().getError()}"
continue
}
} catch (InterruptedException e) {
echo "Error waiting for job completion: ${e}"
continue
}
objectName = "${table.getTableId().getTable()}.json"
echo "Downloading ${objectName} from GCS to local directory"
blob = storage.get(BlobId.of(gcsBucket, objectName))
newFilePath = Paths.get("${destinationPath}/${objectName}")
tempFileTo = Files.createFile(newFilePath)
blob.downloadTo(tempFileTo)
echo "Finished downloading ${objectName}"
} catch (e) {
echo "Failed to process table ${table.getTableId().getTable()}: ${e}"
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment