Skip to content

Instantly share code, notes, and snippets.

@jaketf
Last active July 31, 2020 19:34
Show Gist options
  • Save jaketf/a243729aefa3e1f0df094ce5dc21371f to your computer and use it in GitHub Desktop.
Save jaketf/a243729aefa3e1f0df094ce5dc21371f to your computer and use it in GitHub Desktop.
(untested) monkey patch BigQuerySource to add bigquery job labels kwarg
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
def bq_source_with_job_labels():
"""Call this in your pipeline definition file before using
apache_beam.io.gcp.bigquery.BigQuerySource
to add a `bigquery_job_labels` keyword argument to the
constructor that takes a dict of labels to add to each
BigQuery job submitted by the source.
This will get around this attribute always being hard
coded as empty dict
https://github.com/apache/beam/blob/a72ec0a7dcb3ac05ef64f7fa412b68be62eca7eb/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L1077
Note that the underlying
apache_beam.io.gcp.bigquery_tools.BigQueryReader
does infact set these labels here
https://github.com/apache/beam/blob/a72ec0a7dcb3ac05ef64f7fa412b68be62eca7eb/sdks/python/apache_beam/io/gcp/bigquery_tools.py#L1140
"""
def add_bq_job_labels(init):
"""decorator to set self.bigquery_labels based on constructor
keyword argument"""
@functools.wraps(init)
def init_wrapper(self, *args, **kwargs):
init(self, *args, **kwargs)
self.bigquery_job_labels = kwargs.get('bigquery_job_labels')
return init_wrapper
def reader_with_job_labels(self, test_bigquery_client=None):
from apache_beam.io.gcp import bigquery_tools
return bigquery_tools.BigQueryReader(
source=self,
test_bigquery_client=test_bigquery_client,
use_legacy_sql=self.use_legacy_sql,
flatten_results=self.flatten_results,
kms_key=self.kms_key,
bigquery_job_labels=self.bigquery_job_labels)
apache_beam.io.gcp.bigquery_tools.BigQueryReader.__init__ = add_bq_job_lables(
apache_beam.io.gcp.bigquery_tools.BigQueryReader.__init__)
apache_beam.io.gcp.bigquery.BigQuerySource.__init__ = add_bq_job_lables(
apache_beam.io.gcp.bigquery.BigQuerySource.__init__)
apache_beam.io.gcp.bigquery.BigQuerySource.reader = reader_with_job_labels
### Example Usage (probably in another module)
# from bq_job_lables_monkey_patch import bq_source_with_job_labels
import apache_beam as beam
from apache_beam.io.gcp.bigquery import BigQuerySource
bq_source_with_job_labels()
pipeline = beam.Pipeline()
(pipeline
| beam.io.Read(beam.io.BigQuerySource(
query='SELECT year, mean_temp FROM samples.weather_stations'))
# ... rest of your pipeline logic
)
pipeline.run().wait_until_finish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment