Created
February 18, 2023 21:33
-
-
Save BroaderImpact/bbc394a33015d86ae337426f63cbbaba to your computer and use it in GitHub Desktop.
GCP Data Pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import the required libraries | |
import subprocess | |
import os | |
# Set the GCS bucket and BigQuery dataset and table names | |
BUCKET_NAME = "my-bucket" | |
DATASET_NAME = "my-dataset" | |
TABLE_NAME = "my-table" | |
# Set the names for local files | |
LOCAL_INPUT_FILE = "data.csv" | |
LOCAL_OUTPUT_FILE = "output.csv" | |
# Set the GCS object and BigQuery table names | |
GCS_OBJECT_NAME = "data.csv" | |
BQ_TABLE_NAME = "{}.{}".format(DATASET_NAME, TABLE_NAME) | |
# Set the BigQuery SQL query to extract insights from the data | |
QUERY = """ | |
SELECT column1, column2, COUNT(*) | |
FROM `my-dataset.my-table` | |
GROUP BY column1, column2 | |
""" | |
# Upload the local file to GCS | |
subprocess.check_call(["gsutil", "cp", LOCAL_INPUT_FILE, "gs://{}/{}".format(BUCKET_NAME, GCS_OBJECT_NAME)]) | |
# Load the data from GCS into a BigQuery table | |
subprocess.check_call(["bq", "load", "--autodetect", "--source_format=CSV", BQ_TABLE_NAME, "gs://{}/{}".format(BUCKET_NAME, GCS_OBJECT_NAME)]) | |
# Query the BigQuery table to extract insights from the data | |
result = subprocess.check_output(["bq", "query", "--format=csv", "--headless", "--use_legacy_sql=false", QUERY]) | |
# Write the query results to a local file | |
with open(LOCAL_OUTPUT_FILE, "w") as f: | |
f.write(result.decode("utf-8")) | |
# Download the output file from GCS to the local file system | |
subprocess.check_call(["gsutil", "cp", "gs://{}/output.csv".format(BUCKET_NAME), LOCAL_OUTPUT_FILE]) | |
# Clean up the GCS bucket and BigQuery table | |
subprocess.check_call(["gsutil", "rm", "gs://{}/{}".format(BUCKET_NAME, GCS_OBJECT_NAME)]) | |
subprocess.check_call(["bq", "rm", "--force", BQ_TABLE_NAME]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment