Last active
March 5, 2021 23:02
-
-
Save jaketf/d2dcb4f81ed2fe091e75262023cea4cc to your computer and use it in GitHub Desktop.
dry_run_queries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright 2021 Google LLC | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# https://www.apache.org/licenses/LICENSE-2.0 | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""Command Line utility for dry running BigQuery queries that reference | |
temporary external tables over data in GCS. | |
""" | |
import concurrent.futures | |
import argparse | |
import functools | |
import json | |
import logging | |
import os | |
import pathlib | |
import pprint | |
import sys | |
from google.cloud import bigquery | |
# If you cannot read from public buckets (e.g. due to VPC-SC restriction) | |
# replace this with a URI from a bucket you can read that expands to some files. | |
DUMMY_URI = ("gs://gcp-public-data-landsat/LC08/PRE/063/046/" | |
"LC80630462016136LGN00/*") | |
EXTERNAL_TABLE_DEF_FILENAME = 'external.json' | |
SQL_GLOB = '*.sql' | |
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', | |
datefmt='%m/%d/%Y %I:%M:%S %p', | |
level=logging.INFO) | |
class DryRunsFailedException(Exception): | |
"""Exception to indicate that dry runs did not succeed.""" | |
pass | |
class MissingSqlFileException(Exception): | |
"""Exception to indicate that dry runs did not succeed.""" | |
pass | |
def read_file( | |
path, deserialize_json=False | |
): | |
"""utility for reading files and optionally deserializing json""" | |
with open(path, 'r') as rfile: | |
if deserialize_json: | |
return json.load(rfile) | |
return rfile.read() | |
def dry_run_config_dir( | |
config_dir, bq_client=None | |
): | |
"""dry runs a query and returns the errors""" | |
try: | |
sql_path = sorted(config_dir.glob(SQL_GLOB))[0] | |
except IndexError: | |
raise MissingSqlFileException( | |
"No SQL file found in {}/{}".format(config_dir, SQL_GLOB)) | |
config_path = config_dir / EXTERNAL_TABLE_DEF_FILENAME | |
query_text = read_file(sql_path) | |
external_config = bigquery.ExternalConfig.from_api_repr( | |
read_file(config_path, | |
deserialize_json=True) | |
) | |
if ( | |
not external_config.source_uris | |
or external_config.source_uris == ["REPLACEME"] | |
): | |
# need a source uri that expands to some files so use public uri | |
# Alternatively, this could be refactored to actually point at some GCS | |
# prefix in the data bucket dynamically if it's known that data will | |
# Exist there. | |
external_config.source_uris = [ | |
DUMMY_URI | |
] | |
job_config = bigquery.QueryJobConfig() | |
job_config.table_definitions = {'temp_ext': external_config} | |
job_config.dry_run = True | |
logging.info("dry running {}...".format(config_dir)) | |
try: | |
job = bq_client.query(query_text, job_config=job_config) | |
if job.errors: | |
logging.error("dry running {} failed with {}".format( | |
config_dir.name, job.errors)) | |
return job.errors | |
logging.info("dry running {} was successful".format(config_dir)) | |
return None | |
except Exception as err: | |
logging.error( | |
"dry running {} failed with: {}".format( | |
config_dir, err.message)) | |
raise err | |
def main(args): | |
"""main entry point for dry run external CLI.""" | |
bq_client = bigquery.Client(project=os.getenv('GOOGLE_CLOUD_PROJECT')) | |
dry_run_with_client = functools.partial(dry_run_config_dir, | |
bq_client=bq_client) | |
# The script name itself is a positional arg. | |
config_dirs = args.config_dirs[1:] | |
# Dry run directories in parallel collecting errors. | |
exceptions = {} | |
# max workers set based on BigQuery limit of 300 concurrent jobs per user. | |
# https://cloud.google.com/bigquery/quotas#all_api_requests | |
with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor: | |
future_to_dir = { | |
executor.submit(dry_run_with_client, config_dir): config_dir | |
for config_dir in map(pathlib.Path, config_dirs) | |
} | |
for future in concurrent.futures.as_completed(future_to_dir): | |
config_dir = str(future_to_dir[future]) | |
try: | |
result = future.result() | |
except Exception as err: # pylint: diable=broad-except | |
exceptions[config_dir] = err.message | |
else: | |
if result: | |
exceptions[config_dir] = result | |
if exceptions: | |
raise DryRunsFailedException(pprint.pformat(exceptions)) | |
else: | |
logging.info("all queries dry run successfully!") | |
def parse_args(args): | |
"""argument parser for backfill CLI""" | |
parser = argparse.ArgumentParser( | |
description="utility to dry run external queries based on a list of" | |
"config directories.") | |
parser.add_argument('config_dirs', | |
help='list of config directories containing ' | |
'external.json table definitions and *.sql files ' | |
'be dry run', | |
metavar='config_dir', type=str, | |
nargs='*') | |
return parser.parse_args(args) | |
if __name__ == "__main__": | |
main(parse_args(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment