Skip to content

Instantly share code, notes, and snippets.

@ohnorobo
Created November 10, 2020 18:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ohnorobo/1c8e25383c7ada644ed9376099748991 to your computer and use it in GitHub Desktop.
Save ohnorobo/1c8e25383c7ada644ed9376099748991 to your computer and use it in GitHub Desktop.
A minimal reproduction for a beam partitioning bug
# based on zdenulo's example at
# https://gist.github.com/zdenulo/99877307981b4d372df5a662d581a5df
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp import bigquery
from google.cloud import bigquery as cloud_bigquery
from google.cloud.exceptions import NotFound
TEMP_LOCATION = '' # set
STAGING_LOCATION = '' # set
GCP_PROJECT = '' # set
DATASET = '' # set and create dataset
PARTITIONED_TABLE_NAME = 'partition_test'
FAILED_PARTITIONED_TABLE_NAME = 'failed_partition_test'
PARTITIONED_BQ_TABLE = GCP_PROJECT + ':' + DATASET + '.' + PARTITIONED_TABLE_NAME
FAILED_PARTITIONED_BQ_TABLE = GCP_PROJECT + ':' + DATASET + '.' + FAILED_PARTITIONED_TABLE_NAME
def run(table_name, run_local=True):
pipeline_options = {
'disk_size_gb': 100,
'save_main_session': True,
'project': GCP_PROJECT,
'temp_location': TEMP_LOCATION,
'staging_location': STAGING_LOCATION
}
if not run_local:
pipeline_options['runner'] = 'DataflowRunner'
client = cloud_bigquery.Client()
options = PipelineOptions(**pipeline_options)
p = beam.Pipeline(options=options)
elements = (
p | beam.Create([
{
'country': 'mexico',
'timestamp': '2019-08-23 12:34:56',
'query': 'acapulco'
},
{
'country': 'canada',
'timestamp': '2019-08-23 12:34:33',
'query': 'influenza'
},
]))
schema = {
'fields': [
{
'name': 'country',
'type': 'STRING',
'mode': 'NULLABLE'
},
{
'name': 'timestamp',
'type': 'TIMESTAMP',
'mode': 'NULLABLE'
},
{
'name': 'query',
'type': 'STRING',
'mode': 'NULLABLE'
},
]
}
time_partitioning = {'type': 'DAY', 'field': 'timestamp'}
clustering = {'fields': ['country']}
additional_bq_parameters = {
'timePartitioning': time_partitioning,
'clustering': clustering,
}
# we delete the table ahead of time if it already exists
# to make sure it's being created by beam
try:
fixed_table_name = table_name.replace(':', '.')
old_table = client.get_table(fixed_table_name)
print('table already exists')
print(old_table)
client.delete_table(fixed_table_name)
print('deleted table')
except NotFound as err:
print('table did not exist')
elements | bigquery.WriteToBigQuery(
table=table_name,
create_disposition=bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=bigquery.BigQueryDisposition.WRITE_EMPTY,
schema=schema,
additional_bq_parameters=additional_bq_parameters)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
# when run_local = False it runs Dataflow Runner, otherwise Direct
logging.getLogger().setLevel(logging.DEBUG)
# DirectRunner partitions the table correctly
run(PARTITIONED_BQ_TABLE, run_local=True)
# DataflowRunner doesn't partition the table correctly
run(FAILED_PARTITIONED_BQ_TABLE, run_local=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment