Skip to content

Instantly share code, notes, and snippets.

@goodbyegangster
Created July 1, 2022 05:20
Show Gist options
  • Save goodbyegangster/544101ee3c92839d4b623e41c9477205 to your computer and use it in GitHub Desktop.
Save goodbyegangster/544101ee3c92839d4b623e41c9477205 to your computer and use it in GitHub Desktop.
A Sample Google Provider DAG
from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
BigQueryInsertJobOperator,
)
CONNECTION_ID = "conn_sample_gcp"
PROJECT_ID = "sample-project-xxx"
DATASET_NAME = "sample"
TABLE_NAME = "sample"
with models.DAG(
dag_id="bq_sample",
description="A Sample Google Provider DAG",
schedule_interval="@once",
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["example"],
) as dag:
create_dataset = BigQueryCreateEmptyDatasetOperator(
gcp_conn_id=CONNECTION_ID,
task_id="create_dataset",
dataset_id=DATASET_NAME,
location="asia-northeast1",
)
create_table = BigQueryCreateEmptyTableOperator(
bigquery_conn_id=CONNECTION_ID,
task_id="create_table",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
schema_fields=[
{"name": "id", "type": "STRING", "mode": "REQUIRED"},
{"name": "name", "type": "STRING", "mode": "NULLABLE"}
],
)
insert_data = BigQueryInsertJobOperator(
gcp_conn_id=CONNECTION_ID,
task_id="insert_query_job",
configuration={
"query": {
"query": f"INSERT INTO {DATASET_NAME}.{TABLE_NAME} VALUES ('001', 'mikochi');", # noqa: E501
"useLegacySql": False,
}
},
)
create_dataset >> create_table >> insert_data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment