Skip to content

Instantly share code, notes, and snippets.

@collado-mike
Created April 2, 2022 00:17
Show Gist options
  • Save collado-mike/9202187c13d35299a672f0cea36047d1 to your computer and use it in GitHub Desktop.
Save collado-mike/9202187c13d35299a672f0cea36047d1 to your computer and use it in GitHub Desktop.
import os
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator, \
BigQueryExecuteQueryOperator
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.utils.dates import days_ago
from openlineage.client import set_producer
os.environ["OPENLINEAGE_EXTRACTOR_BigQueryCreateExternalTableOperator"] = \
'custom_extractor.BigQueryExternalTableExtractor'
os.environ["OPENLINEAGE_EXTRACTOR_GCSToGCSOperator"] = \
'custom_extractor.GcsToGcsExtractor'
set_producer("https://github.com/OpenLineage/OpenLineage/tree/0.0.1/integration/airflow")
default_args = {
'owner': 'datascience',
'depends_on_past': False,
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'email': ['mike@astronomer.io']
}
dag = DAG(
'bigquery_external_data',
schedule_interval='@hourly',
catchup=False,
default_args=default_args,
description='Computes values from an external dataset in GCS.'
)
t0 = GCSToGCSOperator(
task_id='copy_from_source_gcs',
source_bucket='bq-airflow-spark',
source_objects=['data/upcoming_order_items/*.parquet'],
destination_bucket='openlineage_webinar_demo',
destination_object='data/upcoming_order_items/',
dag=dag
)
t1 = BigQueryCreateExternalTableOperator(task_id="create_external_table",
table_resource={
'tableReference': {
'projectId': 'bq-airflow-marquez',
'datasetId': 'food_delivery',
'tableId': 'top_delivery_items_external'
},
'externalDataConfiguration': {
'sourceUris': [
'gs://openlineage_webinar_demo/data/upcoming_order_items/*.parquet'
],
'sourceFormat': 'PARQUET'
}
},
dag=dag)
t2 = BigQueryExecuteQueryOperator(
task_id="create_table",
use_legacy_sql=False,
location='US',
sql='''
CREATE TABLE IF NOT EXISTS food_delivery.big_upcoming_orders (
menu_item_id INT64,
quantity INT
)
''',
dag=dag
)
t3 = BigQueryExecuteQueryOperator(
task_id="insert_upcoming_big_orders",
use_legacy_sql=False,
location='US',
sql='''
SELECT * FROM food_delivery.top_delivery_items_external WHERE quantity > 100
''',
destination_dataset_table='bq-airflow-marquez.food_delivery.big_upcoming_orders',
dag=dag
)
t0 >> t1 >> t2 >> t3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment