Skip to content

Instantly share code, notes, and snippets.

@Intelrunner
Last active February 27, 2024 20:46
Show Gist options
  • Save Intelrunner/d4eef85c6b8e49ac725bc8741e2ea01c to your computer and use it in GitHub Desktop.
Save Intelrunner/d4eef85c6b8e49ac725bc8741e2ea01c to your computer and use it in GitHub Desktop.
GCP Cloud Function that Takes an EventARC Event of "InsertDataset" for BQ and immediately changes the billing model to physical for that dataset. Returns a simple log output.
import functions_framework
from google.cloud import bigquery
def extract_dataset_name(full_path):
"""
Extracts the dataset name from a full BigQuery dataset resource path.
Args:
- full_path (str): The full resource path of the dataset,
formatted as 'projects/{project_id}/datasets/{dataset_id}'.
Returns:
- str: The name of the dataset.
"""
# Split the path by '/'
parts = full_path.split('/')
# The last element of the parts list is the dataset name
dataset_name = parts[-1]
project_name = parts[1]
return f"{project_name}.{dataset_name}"
def alter_dataset_storage_billing_model(dataset_name, billing_model):
"""
Alter the storage billing model of a BigQuery dataset using the ALTER SCHEMA query.
Parameters:
- dataset_name (str): The full dataset name (including project ID) in the format `project_id.dataset_id`.
- billing_model (str): The desired billing model, either 'REQUEST_PAY' or 'FLAT_RATE'.
Example usage:
alter_dataset_storage_billing_model('your-project-id.your-dataset-name', 'REQUEST_PAY')
Raises:
- google.api_core.exceptions.GoogleAPIError: If an error occurs while executing the query.
"""
# Initialize the BigQuery client
client = bigquery.Client()
# Construct the ALTER SCHEMA query
query = f"""
ALTER SCHEMA `{dataset_name}`
SET OPTIONS(
storage_billing_model = '{billing_model}'
);
"""
# Execute the query
query_job = client.query(query)
# Wait for the query to complete
query_job.result() # Waits for job to complete
print(f"Dataset '{dataset_name}' updated to use the '{billing_model}' billing model. ✨")
# CloudEvent function to be triggered by an Eventarc Cloud Audit Logging trigger
# Note: this is NOT designed for second-party (Cloud Audit Logs -> Pub/Sub) triggers!
@functions_framework.cloud_event
def hello_auditlog(cloudevent):
"""
Prints out the CloudEvent's type and subject properties.
Also prints out details from the protoPayload, which encapsulates a Cloud Audit Logging entry.
Parameters:
- cloudevent: The CloudEvent object containing the event data.
Returns:
- The result of calling the alter_dataset_storage_billing_model function.
"""
print(f"Event type: {cloudevent['type']}")
if 'subject' in cloudevent:
print(f"Subject: {cloudevent['subject']}")
payload = cloudevent.data.get("protoPayload")
if payload:
print(f"API method: {payload.get('methodName')}")
print(f"Resource name: {payload.get('resourceName')}")
print(f"Principal: {payload.get('authenticationInfo', dict()).get('principalEmail')}")
x = extract_dataset_name(payload.get('resourceName'))
# Finalizes the change of the dataset to Physical
return alter_dataset_storage_billing_model(x, 'PHYSICAL')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment