Skip to content

Instantly share code, notes, and snippets.

@Mistobaan
Created June 24, 2020 00:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Mistobaan/a70ea76e990f90a4f9c6c81ade6294fe to your computer and use it in GitHub Desktop.
Save Mistobaan/a70ea76e990f90a4f9c6c81ade6294fe to your computer and use it in GitHub Desktop.
with models.DAG() as dag:
dataset = 'BREATHE'
# create a task for each table in the BREATHE dataset
for table_name in list_breathe_tables():
destination_uri = 'gs://' + '/'.join((dest_bucket,
'{{ ds_nodash }}', # one day is enough for the full snapshot
table_name,
table_name + '-{{ ds_nodash }}-*.jsonl'))
logger.info('generating task to transfer table: %s to %s', table_source, destination_uri)
task = bigquery_to_gcs.BigQueryToCloudStorageOperator(
source_project_dataset_table=table_name,
destination_cloud_storage_uris=[destination_uri],
compression='NONE',
export_format='NEWLINE_DELIMITED_JSON'
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment