-
-
Save collado-mike/9202187c13d35299a672f0cea36047d1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from airflow import DAG | |
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, \ | |
BigQueryExecuteQueryOperator | |
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator | |
from airflow.utils.dates import days_ago | |
from openlineage.client import set_producer | |
os.environ["OPENLINEAGE_EXTRACTOR_BigQueryCreateExternalTableOperator"] = \ | |
'custom_extractor.BigQueryExternalTableExtractor' | |
os.environ["OPENLINEAGE_EXTRACTOR_GCSToGCSOperator"] = \ | |
'custom_extractor.GcsToGcsExtractor' | |
set_producer("https://github.com/OpenLineage/OpenLineage/tree/0.0.1/integration/airflow") | |
default_args = { | |
'owner': 'datascience', | |
'depends_on_past': False, | |
'start_date': days_ago(1), | |
'email_on_failure': False, | |
'email_on_retry': False, | |
'email': ['mike@astronomer.io'] | |
} | |
dag = DAG( | |
'bigquery_external_data', | |
schedule_interval='@hourly', | |
catchup=False, | |
default_args=default_args, | |
description='Computes values from an external dataset in GCS.' | |
) | |
t0 = GCSToGCSOperator( | |
task_id='copy_from_source_gcs', | |
source_bucket='bq-airflow-spark', | |
source_objects=['data/upcoming_order_items/*.parquet'], | |
destination_bucket='openlineage_webinar_demo', | |
destination_object='data/upcoming_order_items/', | |
dag=dag | |
) | |
t1 = BigQueryCreateExternalTableOperator(task_id="create_external_table", | |
table_resource={ | |
'tableReference': { | |
'projectId': 'bq-airflow-marquez', | |
'datasetId': 'food_delivery', | |
'tableId': 'top_delivery_items_external' | |
}, | |
'externalDataConfiguration': { | |
'sourceUris': [ | |
'gs://openlineage_webinar_demo/data/upcoming_order_items/*.parquet' | |
], | |
'sourceFormat': 'PARQUET' | |
} | |
}, | |
dag=dag) | |
t2 = BigQueryExecuteQueryOperator( | |
task_id="create_table", | |
use_legacy_sql=False, | |
location='US', | |
sql=''' | |
CREATE TABLE IF NOT EXISTS food_delivery.big_upcoming_orders ( | |
menu_item_id INT64, | |
quantity INT | |
) | |
''', | |
dag=dag | |
) | |
t3 = BigQueryExecuteQueryOperator( | |
task_id="insert_upcoming_big_orders", | |
use_legacy_sql=False, | |
location='US', | |
sql=''' | |
SELECT * FROM food_delivery.top_delivery_items_external WHERE quantity > 100 | |
''', | |
destination_dataset_table='bq-airflow-marquez.food_delivery.big_upcoming_orders', | |
dag=dag | |
) | |
t0 >> t1 >> t2 >> t3 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment