Created
March 29, 2024 13:32
-
-
Save jster1357/8f90a129f337fb1d014a8bf60f87dafb to your computer and use it in GitHub Desktop.
Terraform requires BQ table details to be in JSON format. This code extracts the schema, partitioning, and clustering details in JSON format so it can be used in Terraform BQ object creation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import google.auth.transport.requests | |
from google.oauth2 import id_token | |
from google.oauth2 import service_account | |
from google.auth.transport.requests import AuthorizedSession | |
import google.auth | |
import requests | |
import json | |
import os | |
##set variables | |
project_id='' | |
dataset='' | |
bq_endpoint='https://bigquery.googleapis.com/bigquery/v2' | |
ddl_output_path='' | |
###get access token | |
def get_token(): | |
SCOPES = ['https://www.googleapis.com/auth/cloud-platform'] | |
SERVICE_ACCOUNT_FILE = '' | |
credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES) | |
auth_req = google.auth.transport.requests.Request() | |
credentials.refresh(auth_req) | |
token = credentials.token | |
return token | |
##make api call | |
def make_request(api): | |
headers = {"Authorization": "Bearer " + get_token()} | |
bq_endpoint='https://bigquery.googleapis.com/bigquery/v2' | |
response = requests.get(api,headers=headers,verify=False) | |
response=json.loads(response.text) | |
return response | |
##format DDL | |
def get_schema(response): | |
format_schema={} | |
format_schema=response['schema']['fields'] | |
return format_schema | |
def get_partitioning(response): | |
format_partitioning={} | |
##check if timePartitioning is set | |
if response.get('timePartitioning') is not None: | |
format_partitioning=response['timePartitioning'] | |
return format_partitioning | |
##check if rangePartitioning is set | |
if response.get('rangePartitioning') is not None: | |
format_partitioning=response['rangePartitioning'] | |
return format_partitioning | |
return None | |
def get_clustering(response): | |
format_clustering={} | |
##check if clustering is set | |
if response.get('clustering') is not None: | |
format_clustering=response['clustering']['fields'] | |
return format_clustering | |
return None | |
##get table list from input dataset | |
def get_table_list(dataset): | |
resource = "/projects/" + project_id + "/datasets/" + dataset + "/tables" | |
api=str(bq_endpoint+resource) | |
table_response=make_request(api) | |
table_list=[] | |
for table in table_response['tables']: | |
t=table['tableReference']['tableId'] | |
table_list.append(t) | |
return table_list | |
def write_files(file_name,file_input,directory): | |
if file_input: | |
f = open(directory +'/' + file_name,"a") | |
json.dump(file_input,f) | |
f.close() | |
##call api to get ddl for a given table in a given dataset, write to local directory | |
def generate_table_json(table_list): | |
for table in table_list: | |
resource= "/projects/" + project_id + "/datasets/" + dataset + "/tables/" + table | |
api=str(bq_endpoint+resource) | |
##make API request | |
response=make_request(api) | |
###get schema | |
schema = get_schema(response) | |
##get partitioning | |
partitioning = get_partitioning(response) | |
##get clustering | |
clustering = get_clustering(response) | |
##make diretory for each table to hold schema, partitioning, clustering | |
table_directory = ddl_output_path + table | |
os.mkdir(table_directory,) | |
##format filenames | |
table_schema_file=table + "_schema.json" | |
partitioning_file=table + "_partitioning.json" | |
clustering_file=table + "_clustering.json" | |
###write the files | |
write_files(table_schema_file,schema,table_directory) | |
write_files(partitioning_file,partitioning,table_directory) | |
write_files(clustering_file,clustering,table_directory) | |
##call to get the ddl's. dataset is passed into the get_table_list function | |
generate_table_json(get_table_list('tpch')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment