Skip to content

Instantly share code, notes, and snippets.

@jster1357
Created March 29, 2024 13:32
Show Gist options
  • Save jster1357/8f90a129f337fb1d014a8bf60f87dafb to your computer and use it in GitHub Desktop.
Save jster1357/8f90a129f337fb1d014a8bf60f87dafb to your computer and use it in GitHub Desktop.
Terraform requires BQ table details to be in JSON format. This code extracts the schema, partitioning, and clustering details in JSON format so it can be used in Terraform BQ object creation.
import google.auth.transport.requests
from google.oauth2 import id_token
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
import google.auth
import requests
import json
import os
##set variables
project_id=''
dataset=''
bq_endpoint='https://bigquery.googleapis.com/bigquery/v2'
ddl_output_path=''
###get access token
def get_token():
SCOPES = ['https://www.googleapis.com/auth/cloud-platform']
SERVICE_ACCOUNT_FILE = ''
credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
token = credentials.token
return token
##make api call
def make_request(api):
headers = {"Authorization": "Bearer " + get_token()}
bq_endpoint='https://bigquery.googleapis.com/bigquery/v2'
response = requests.get(api,headers=headers,verify=False)
response=json.loads(response.text)
return response
##format DDL
def get_schema(response):
format_schema={}
format_schema=response['schema']['fields']
return format_schema
def get_partitioning(response):
format_partitioning={}
##check if timePartitioning is set
if response.get('timePartitioning') is not None:
format_partitioning=response['timePartitioning']
return format_partitioning
##check if rangePartitioning is set
if response.get('rangePartitioning') is not None:
format_partitioning=response['rangePartitioning']
return format_partitioning
return None
def get_clustering(response):
format_clustering={}
##check if clustering is set
if response.get('clustering') is not None:
format_clustering=response['clustering']['fields']
return format_clustering
return None
##get table list from input dataset
def get_table_list(dataset):
resource = "/projects/" + project_id + "/datasets/" + dataset + "/tables"
api=str(bq_endpoint+resource)
table_response=make_request(api)
table_list=[]
for table in table_response['tables']:
t=table['tableReference']['tableId']
table_list.append(t)
return table_list
def write_files(file_name,file_input,directory):
if file_input:
f = open(directory +'/' + file_name,"a")
json.dump(file_input,f)
f.close()
##call api to get ddl for a given table in a given dataset, write to local directory
def generate_table_json(table_list):
for table in table_list:
resource= "/projects/" + project_id + "/datasets/" + dataset + "/tables/" + table
api=str(bq_endpoint+resource)
##make API request
response=make_request(api)
###get schema
schema = get_schema(response)
##get partitioning
partitioning = get_partitioning(response)
##get clustering
clustering = get_clustering(response)
##make diretory for each table to hold schema, partitioning, clustering
table_directory = ddl_output_path + table
os.mkdir(table_directory,)
##format filenames
table_schema_file=table + "_schema.json"
partitioning_file=table + "_partitioning.json"
clustering_file=table + "_clustering.json"
###write the files
write_files(table_schema_file,schema,table_directory)
write_files(partitioning_file,partitioning,table_directory)
write_files(clustering_file,clustering,table_directory)
##call to get the ddl's. dataset is passed into the get_table_list function
generate_table_json(get_table_list('tpch'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment