Created
November 10, 2020 18:37
-
-
Save ohnorobo/1c8e25383c7ada644ed9376099748991 to your computer and use it in GitHub Desktop.
A minimal reproduction for a beam partitioning bug
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# based on zdenulo's example at | |
# https://gist.github.com/zdenulo/99877307981b4d372df5a662d581a5df | |
import logging | |
import apache_beam as beam | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.io.gcp import bigquery | |
from google.cloud import bigquery as cloud_bigquery | |
from google.cloud.exceptions import NotFound | |
TEMP_LOCATION = '' # set | |
STAGING_LOCATION = '' # set | |
GCP_PROJECT = '' # set | |
DATASET = '' # set and create dataset | |
PARTITIONED_TABLE_NAME = 'partition_test' | |
FAILED_PARTITIONED_TABLE_NAME = 'failed_partition_test' | |
PARTITIONED_BQ_TABLE = GCP_PROJECT + ':' + DATASET + '.' + PARTITIONED_TABLE_NAME | |
FAILED_PARTITIONED_BQ_TABLE = GCP_PROJECT + ':' + DATASET + '.' + FAILED_PARTITIONED_TABLE_NAME | |
def run(table_name, run_local=True): | |
pipeline_options = { | |
'disk_size_gb': 100, | |
'save_main_session': True, | |
'project': GCP_PROJECT, | |
'temp_location': TEMP_LOCATION, | |
'staging_location': STAGING_LOCATION | |
} | |
if not run_local: | |
pipeline_options['runner'] = 'DataflowRunner' | |
client = cloud_bigquery.Client() | |
options = PipelineOptions(**pipeline_options) | |
p = beam.Pipeline(options=options) | |
elements = ( | |
p | beam.Create([ | |
{ | |
'country': 'mexico', | |
'timestamp': '2019-08-23 12:34:56', | |
'query': 'acapulco' | |
}, | |
{ | |
'country': 'canada', | |
'timestamp': '2019-08-23 12:34:33', | |
'query': 'influenza' | |
}, | |
])) | |
schema = { | |
'fields': [ | |
{ | |
'name': 'country', | |
'type': 'STRING', | |
'mode': 'NULLABLE' | |
}, | |
{ | |
'name': 'timestamp', | |
'type': 'TIMESTAMP', | |
'mode': 'NULLABLE' | |
}, | |
{ | |
'name': 'query', | |
'type': 'STRING', | |
'mode': 'NULLABLE' | |
}, | |
] | |
} | |
time_partitioning = {'type': 'DAY', 'field': 'timestamp'} | |
clustering = {'fields': ['country']} | |
additional_bq_parameters = { | |
'timePartitioning': time_partitioning, | |
'clustering': clustering, | |
} | |
# we delete the table ahead of time if it already exists | |
# to make sure it's being created by beam | |
try: | |
fixed_table_name = table_name.replace(':', '.') | |
old_table = client.get_table(fixed_table_name) | |
print('table already exists') | |
print(old_table) | |
client.delete_table(fixed_table_name) | |
print('deleted table') | |
except NotFound as err: | |
print('table did not exist') | |
elements | bigquery.WriteToBigQuery( | |
table=table_name, | |
create_disposition=bigquery.BigQueryDisposition.CREATE_IF_NEEDED, | |
write_disposition=bigquery.BigQueryDisposition.WRITE_EMPTY, | |
schema=schema, | |
additional_bq_parameters=additional_bq_parameters) | |
result = p.run() | |
result.wait_until_finish() | |
if __name__ == '__main__': | |
# when run_local = False it runs Dataflow Runner, otherwise Direct | |
logging.getLogger().setLevel(logging.DEBUG) | |
# DirectRunner partitions the table correctly | |
run(PARTITIONED_BQ_TABLE, run_local=True) | |
# DataflowRunner doesn't partition the table correctly | |
run(FAILED_PARTITIONED_BQ_TABLE, run_local=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment