Skip to content

Instantly share code, notes, and snippets.

@BroaderImpact
Created February 18, 2023 21:33
Show Gist options
  • Save BroaderImpact/bbc394a33015d86ae337426f63cbbaba to your computer and use it in GitHub Desktop.
Save BroaderImpact/bbc394a33015d86ae337426f63cbbaba to your computer and use it in GitHub Desktop.
GCP Data Pipeline
# Import the required libraries
import subprocess
import os
# Set the GCS bucket and BigQuery dataset and table names
BUCKET_NAME = "my-bucket"
DATASET_NAME = "my-dataset"
TABLE_NAME = "my-table"
# Set the names for local files
LOCAL_INPUT_FILE = "data.csv"
LOCAL_OUTPUT_FILE = "output.csv"
# Set the GCS object and BigQuery table names
GCS_OBJECT_NAME = "data.csv"
BQ_TABLE_NAME = "{}.{}".format(DATASET_NAME, TABLE_NAME)
# Set the BigQuery SQL query to extract insights from the data
QUERY = """
SELECT column1, column2, COUNT(*)
FROM `my-dataset.my-table`
GROUP BY column1, column2
"""
# Upload the local file to GCS
subprocess.check_call(["gsutil", "cp", LOCAL_INPUT_FILE, "gs://{}/{}".format(BUCKET_NAME, GCS_OBJECT_NAME)])
# Load the data from GCS into a BigQuery table
subprocess.check_call(["bq", "load", "--autodetect", "--source_format=CSV", BQ_TABLE_NAME, "gs://{}/{}".format(BUCKET_NAME, GCS_OBJECT_NAME)])
# Query the BigQuery table to extract insights from the data
result = subprocess.check_output(["bq", "query", "--format=csv", "--headless", "--use_legacy_sql=false", QUERY])
# Write the query results to a local file
with open(LOCAL_OUTPUT_FILE, "w") as f:
f.write(result.decode("utf-8"))
# Download the output file from GCS to the local file system
subprocess.check_call(["gsutil", "cp", "gs://{}/output.csv".format(BUCKET_NAME), LOCAL_OUTPUT_FILE])
# Clean up the GCS bucket and BigQuery table
subprocess.check_call(["gsutil", "rm", "gs://{}/{}".format(BUCKET_NAME, GCS_OBJECT_NAME)])
subprocess.check_call(["bq", "rm", "--force", BQ_TABLE_NAME])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment