Skip to content

Instantly share code, notes, and snippets.

@jimfulton
Created July 16, 2021 20:11
Show Gist options
  • Save jimfulton/d20ceba7ed1da6051e96910c12470795 to your computer and use it in GitHub Desktop.
Save jimfulton/d20ceba7ed1da6051e96910c12470795 to your computer and use it in GitHub Desktop.
(grungy :)) Script to time large test BigQuery load
import sys
import time
import contextlib
from pprint import pprint as pp
from google.cloud import bigquery
client = bigquery.Client()
def cq(sql, *a, **k):
pp(list(client.query(sql, *a, **k)))
dataset = "riversnake"
table = "test_table_data"
cq(f"drop table if exists {dataset}.{table}")
cq(f"""CREATE TABLE {dataset}.{table}
(
col1_str STRING,
col2_str STRING,
col3_str STRING,
col4_str STRING,
col5_str STRING,
col6_str STRING,
col7_int INT64,
col8_int INT64
)
""")
if sys.argv[1:]:
[src] = sys.argv[1:]
@contextlib.contextmanager
def timer(message=''):
start = time.time()
yield
print(message, time.time() - start)
config = bigquery.LoadJobConfig()
config.autodetect = False
config.source_format = "CSV"
config.skip_leading_rows = 1
config.write_disposition = "WRITE_TRUNCATE"
with open(src, 'rb') as f:
with timer():
job = client.load_table_from_file(
f,
f"{dataset}.{table}",
job_config=config
)
job.result()
assert job.errors is None and job.error_result is None
cq(f"select count(*) from {dataset}.{table}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment