Skip to content

Instantly share code, notes, and snippets.

@jaketf
Last active March 5, 2021 23:02
Show Gist options
  • Save jaketf/d2dcb4f81ed2fe091e75262023cea4cc to your computer and use it in GitHub Desktop.
Save jaketf/d2dcb4f81ed2fe091e75262023cea4cc to your computer and use it in GitHub Desktop.
dry_run_queries
#!/usr/bin/env python3
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Command Line utility for dry running BigQuery queries that reference
temporary external tables over data in GCS.
"""
import concurrent.futures
import argparse
import functools
import json
import logging
import os
import pathlib
import pprint
import sys
from google.cloud import bigquery
# If you cannot read from public buckets (e.g. due to VPC-SC restriction)
# replace this with a URI from a bucket you can read that expands to some files.
DUMMY_URI = ("gs://gcp-public-data-landsat/LC08/PRE/063/046/"
"LC80630462016136LGN00/*")
EXTERNAL_TABLE_DEF_FILENAME = 'external.json'
SQL_GLOB = '*.sql'
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p',
level=logging.INFO)
class DryRunsFailedException(Exception):
"""Exception to indicate that dry runs did not succeed."""
pass
class MissingSqlFileException(Exception):
"""Exception to indicate that dry runs did not succeed."""
pass
def read_file(
path, deserialize_json=False
):
"""utility for reading files and optionally deserializing json"""
with open(path, 'r') as rfile:
if deserialize_json:
return json.load(rfile)
return rfile.read()
def dry_run_config_dir(
config_dir, bq_client=None
):
"""dry runs a query and returns the errors"""
try:
sql_path = sorted(config_dir.glob(SQL_GLOB))[0]
except IndexError:
raise MissingSqlFileException(
"No SQL file found in {}/{}".format(config_dir, SQL_GLOB))
config_path = config_dir / EXTERNAL_TABLE_DEF_FILENAME
query_text = read_file(sql_path)
external_config = bigquery.ExternalConfig.from_api_repr(
read_file(config_path,
deserialize_json=True)
)
if (
not external_config.source_uris
or external_config.source_uris == ["REPLACEME"]
):
# need a source uri that expands to some files so use public uri
# Alternatively, this could be refactored to actually point at some GCS
# prefix in the data bucket dynamically if it's known that data will
# Exist there.
external_config.source_uris = [
DUMMY_URI
]
job_config = bigquery.QueryJobConfig()
job_config.table_definitions = {'temp_ext': external_config}
job_config.dry_run = True
logging.info("dry running {}...".format(config_dir))
try:
job = bq_client.query(query_text, job_config=job_config)
if job.errors:
logging.error("dry running {} failed with {}".format(
config_dir.name, job.errors))
return job.errors
logging.info("dry running {} was successful".format(config_dir))
return None
except Exception as err:
logging.error(
"dry running {} failed with: {}".format(
config_dir, err.message))
raise err
def main(args):
"""main entry point for dry run external CLI."""
bq_client = bigquery.Client(project=os.getenv('GOOGLE_CLOUD_PROJECT'))
dry_run_with_client = functools.partial(dry_run_config_dir,
bq_client=bq_client)
# The script name itself is a positional arg.
config_dirs = args.config_dirs[1:]
# Dry run directories in parallel collecting errors.
exceptions = {}
# max workers set based on BigQuery limit of 300 concurrent jobs per user.
# https://cloud.google.com/bigquery/quotas#all_api_requests
with concurrent.futures.ThreadPoolExecutor(max_workers=300) as executor:
future_to_dir = {
executor.submit(dry_run_with_client, config_dir): config_dir
for config_dir in map(pathlib.Path, config_dirs)
}
for future in concurrent.futures.as_completed(future_to_dir):
config_dir = str(future_to_dir[future])
try:
result = future.result()
except Exception as err: # pylint: diable=broad-except
exceptions[config_dir] = err.message
else:
if result:
exceptions[config_dir] = result
if exceptions:
raise DryRunsFailedException(pprint.pformat(exceptions))
else:
logging.info("all queries dry run successfully!")
def parse_args(args):
"""argument parser for backfill CLI"""
parser = argparse.ArgumentParser(
description="utility to dry run external queries based on a list of"
"config directories.")
parser.add_argument('config_dirs',
help='list of config directories containing '
'external.json table definitions and *.sql files '
'be dry run',
metavar='config_dir', type=str,
nargs='*')
return parser.parse_args(args)
if __name__ == "__main__":
main(parse_args(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment