Skip to content

Instantly share code, notes, and snippets.

@mr-ubik
Created July 27, 2017 10:17
Show Gist options
  • Save mr-ubik/74fd036cb3f5eb05f0740581e1a9cc72 to your computer and use it in GitHub Desktop.
Save mr-ubik/74fd036cb3f5eb05f0740581e1a9cc72 to your computer and use it in GitHub Desktop.
Apache Beam - BigQuery - Creating and uploading a table schema
with beam.Pipeline(argv=pipeline_args) as p:
from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position
table_schema = bigquery.TableSchema()
# Fields that use standard types.
alpha_schema = bigquery.TableFieldSchema()
alpha_schema.name = 'alpha'
alpha_schema.type = 'string'
alpha_schema.mode = 'nullable'
table_schema.fields.append(alpha_schema)
# A nested field
# beta_schema
# |-- delta
# |-- gama
beta_schema = bigquery.TableFieldSchema()
beta_schema.name = 'beta'
beta_schema.type = 'record'
beta_schema.mode = 'nullable'
delta = bigquery.TableFieldSchema()
delta.name = 'delta'
delta.type = 'integer'
delta.mode = 'nullable'
beta_schema.fields.append(delta) # Append data to beta
gamma = bigquery.TableFieldSchema()
gamma.name = 'gamma'
gamma.type = 'integer'
gamma.mode = 'nullable'
beta_schema.fields.append(gamma) # Append data to beta
table_schema.fields.append(beta_schema) # Append the nested fields to the table_schema
# A repeated field.
children_schema = bigquery.TableFieldSchema()
children_schema.name = 'children'
children_schema.type = 'string'
children_schema.mode = 'repeated'
table_schema.fields.append(children_schema)
# pylint: disable=expression-not-assigned
output_data | 'WriteToBigQuery' >> beam.io.Write(
beam.io.BigQuerySink(
known_args.output,
schema=table_schema, # Pass the defined table_schema
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment