Created
August 28, 2019 22:07
-
-
Save jaketf/dc7c0e4506fec1f0048e37e643775b02 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Command line utility to create/update views on a big query's table. | |
This utility pulls the JSON schema of a given table and iterates through them | |
and filters out the blacklisted fields , and creats a string with all the | |
required columns for a view | |
Inut as Commandline Args: | |
- Arg 1: projectname.datasetname.tablename (Fully qualified table name) | |
- Arg2 :blacklisted fields: A string with comma separated black listed fields | |
- Arg3 : bq command path | |
- Arg4 : View project name.View Dataset.View Name | |
Output : | |
This tool filters out the blacklisted fields from the source table and creates a | |
view with the remaining fields in the destination project and destination | |
dataset withe given view name | |
***This tool is idompetent hence creates a new view if a view does not exist | |
with the given view name. | |
If the view exists , it drops the view and recreate with latest column fields | |
""" | |
import json | |
import sys | |
import subprocess | |
import os | |
def view_builder(col_str, table_fqn, view_fqn, bq_path): | |
"""Idempotent function to create/update view | |
Parameters : | |
col_str : String of comma separated columns require for the view | |
creation | |
table_fqn : Fully Qualified name of the table | |
i.e. ProjectName.DatasetName.TableName | |
view_fqn : Fully qualified name of the view | |
i.e. ProjectName.DatasetName.TableName | |
Creates or updates a view | |
""" | |
# Parse the view_fqn | |
view_list = list(view_fqn.split('.')) | |
view_prj_name = view_list[0] | |
view_ds_name = view_list[1] | |
view_vw_name = view_list[2] | |
# Create the view with new columns and view name | |
new_view_query = "\"CREATE OR REPLACE VIEW \`" + \ | |
view_fqn + \ | |
"\` AS SELECT " + col_str + " FROM \`" + table_fqn + "\`\"" | |
make_view_command = " ".join([ | |
bq_path, "query", "--use_legacy_sql=false", | |
"--project_id=" + view_prj_name, | |
new_view_query | |
]) | |
try: | |
sys.stdout.write("Creating BQ View:") | |
sys.stdout.write(make_view_command) | |
subprocess.check_output(make_view_command, shell=True) | |
except subprocess.CalledProcessError as err: | |
raise RuntimeError(err.output) | |
def pull_table_schema(src_proj, src_ds, src_table, bq_path): | |
"""Function to pull the JSON schema of a given BigQuery's table | |
Parameters: | |
Source Project Name, Source Dataset Name, Source Table Name | |
Output : | |
A JSON string with source table schema is created on the local disk | |
""" | |
source_table_name = src_proj + ":" + src_ds + "." + src_table | |
# Pull the Json schema of the table using table FQN | |
try: | |
schema_json = subprocess.check_output(" ".join([ | |
bq_path, "show", "--format=prettyjson", source_table_name]), shell=True) | |
except subprocess.CalledProcessError as err: | |
raise RuntimeError(err.output) | |
# check that we got a json object | |
return json.loads(schema_json) | |
def view_columns_builder(source_table_schema, blacklist_str): | |
""" | |
Function that loads and parses a JSON File and retunrs a string of | |
whitelisted columns | |
Parameters: | |
source_table_json: dict with source table schema | |
blacklist_str : A string with comma seperated blacklisted fields | |
Ouput: | |
A string with whitelisted columns which can be used in view creation | |
""" | |
view_columns_str = "" | |
# Parse the blacklist string and build the blacklist | |
blacklist = list() | |
blacklist = list(blacklist_str.split(',')) | |
# Loop through the schama and build a string with filtered columns | |
data = source_table_schema["schema"]["fields"] | |
for p_columns in data: | |
parent_col = p_columns["name"] | |
parent_col_mode = p_columns["mode"] | |
parent_col_type = p_columns["type"] | |
if parent_col_type == "RECORD": | |
# Handle structs that are null. | |
null_handler = "IF(%s is null, null, " % parent_col | |
view_columns_str = view_columns_str + null_handler | |
if parent_col_mode == "REPEATED": | |
view_columns_str = view_columns_str + "ARRAY(SELECT AS STRUCT " | |
elif (parent_col_mode == "NULLABLE" or parent_col_mode == "REQUIRED"): | |
view_columns_str = view_columns_str + "STRUCT(" | |
cols = p_columns["fields"] | |
item_count = 0 | |
for col in cols: | |
field_name = col["name"] | |
if field_name not in blacklist: | |
item_count = item_count + 1 | |
if item_count == 1: | |
view_columns_str = view_columns_str + parent_col + "." \ | |
+ field_name | |
else: | |
view_columns_str = view_columns_str + "," + \ | |
parent_col + "." + field_name | |
if parent_col_mode == "REPEATED": | |
view_columns_str = view_columns_str + \ | |
" FROM UNNEST(" + parent_col + ") AS " + parent_col + ")) as " + \ | |
parent_col | |
elif parent_col_mode == "NULLABLE" or parent_col_mode == "REQUIRED": | |
view_columns_str = view_columns_str + ")) as " + parent_col | |
else: # Top level column is not nested. | |
if parent_col not in blacklist: | |
view_columns_str = view_columns_str + parent_col | |
# Add comma separator between fields. | |
view_columns_str = view_columns_str + "," | |
return view_columns_str.rstrip(",") | |
def main(): | |
bq_command_path = os.environ.get('BQ_PATH', sys.argv[3:]) | |
source_table_fqn = os.environ.get('TABLE_FQN', sys.argv[1:]) | |
blacklist_fields_string = os.environ.get('BLACKLIST_FIELDS', sys.argv[2:]) | |
destination_view_fqn = os.environ.get('VIEW_FQN', sys.argv[4:]) | |
required_args = [bq_command_path, source_table_fqn, destination_view_fqn] | |
if not all(required_args): | |
print("required variable not set:\nbq_command_path: {}\nsource_table_fqn: {}\ndestination_view_fqn: {}".format( | |
bq_command_path, source_table_fqn, destination_view_fqn)) | |
exit(1) | |
view_columns = "" | |
table_list = list(source_table_fqn.split('.')) | |
source_project_name = table_list[0] | |
source_dataset_name = table_list[1] | |
source_table_name = table_list[2] | |
source_schema = pull_table_schema(source_project_name, source_dataset_name, | |
source_table_name, bq_command_path) | |
view_columns = view_columns_builder(source_schema, blacklist_fields_string) | |
view_builder(view_columns, source_table_fqn, destination_view_fqn, | |
bq_command_path) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment