Skip to content

Instantly share code, notes, and snippets.

@ubunatic
Last active May 22, 2017 12:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ubunatic/29352bc2c9ddfc33163cfac47bc1e4d6 to your computer and use it in GitHub Desktop.
Save ubunatic/29352bc2c9ddfc33163cfac47bc1e4d6 to your computer and use it in GitHub Desktop.
BigQuery: copy from temp table across regions
#!python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Description
# -----------
# Example showing how to store query results in differnent regions
# using the BigQuery Python SDK. This example uses the BigQuery-internal tables
# created for each async query-job.
#
# Expected Output
# --------------
# [{"job": "query dataset in us", "source_location": "US", "target_location": "US (None)"}]
# [{"job": "copy from us to eu", "source_location": "US", "target_location": "EU"}]
# [{"job": "query dataset in eu", "source_location": "EU", "target_location": "US (None)"}]
# [{"job": "copy from eu to us", "source_location": "EU", "target_location": "US"}]
#
from __future__ import absolute_import
import time, json
from uuid import uuid4
from google.cloud import bigquery
def wait_for_job(job):
while job.state != 'DONE': time.sleep(1); job.reload()
if job.error_result: raise RuntimeError(job.errors)
def pretty_location(table_or_dataset):
loc = table_or_dataset.location
if loc is None: loc = 'US (None)'
return loc
def jobinfo(job_name, job, source_ds_name):
c = bigquery.Client()
ds = c.dataset(source_ds_name)
t = job.destination
ds.reload()
t.reload()
assert t.exists()
info = {'job':job_name, 'target_location': pretty_location(t), 'source_location': pretty_location(ds)},
return json.dumps(info, sort_keys=True)
def test_cross_region_copy():
c = bigquery.Client()
for src_region, trg_region in [('us','eu'),('eu','us')]:
# run query and find bg-internal temp table
src_ds = "test_dummy_{}".format(src_region)
query = "SELECT user_id from [{}.user_details]".format(src_ds)
q = c.run_async_query(str(uuid4()), query)
q.begin(); wait_for_job(q)
print jobinfo("query dataset in {}".format(src_region), q, src_ds)
# copy bg-internal temp table to real table
trg_ds = "test_dummy_{}".format(trg_region)
table_name = "user_details_{}".format(src_region)
j = c.copy_table(str(uuid4()), c.dataset(trg_ds).table(table_name), q.destination)
j.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
j.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE
j.begin(); wait_for_job(j)
print jobinfo("copy from {} to {}".format(src_region, trg_region), j, q.destination.dataset_name)
if __name__ == '__main__': test_cross_region_copy()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment