Skip to content

Instantly share code, notes, and snippets.

@gmyrianthous
Created July 8, 2022 14:26
Show Gist options
  • Save gmyrianthous/e0a77c7542b4f6ee381ac988f5522d38 to your computer and use it in GitHub Desktop.
Save gmyrianthous/e0a77c7542b4f6ee381ac988f5522d38 to your computer and use it in GitHub Desktop.
Apache Airflow DAG for load data from Postgres database into Google Cloud BigQuery (through Google Cloud Storage)
from datetime import timedelta
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.gcs import GCSDeleteObjectsOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
BQ_DS = 'my_dataset'
BQ_PROJECT = 'my-project'
GCS_BUCKET = 'my-bucket'
GCS_OBJECT_PATH = 'postgres-test'
SOURCE_TABLE_NAME = 'mytable'
POSTGRESS_CONNECTION_ID = 'postgres'
schema = [
{
'name': 'id',
'type': 'STRING',
'mode': 'NULLABLE',
},
{
'name': 'name',
'type': 'STRING',
'mode': 'NULLABLE',
},
{
'name': 'age',
'type': 'INTEGER',
'mode': 'NULLABLE',
},
{
'name': 'is_active',
'type': 'BOOLEAN',
'mode': 'NULLABLE',
},
]
with DAG(
dag_id='load_postgres_into_bq',
start_date=days_ago(1),
default_args={
'owner': 'airflow',
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
schedule_interval='0 9 * * *',
max_active_runs=1,
) as dag:
postgres_to_gcs_task = PostgresToGCSOperator(
task_id=f'postgres_to_gcs',
postgres_conn_id=POSTGRES_CONNECTION_ID,
sql=f'SELECT * FROM {SOURCE_TABLE_NAME};',
bucket=GCS_BUCKET,
filename=f'{GCS_OBJECT_PATH}/{SOURCE_TABLE_NAME}.{FILE_FORMAT}',
export_format='csv',
gzip=False,
use_server_side_cursor=False,
)
gcs_to_bq_task = return GCSToBigQueryOperator(
task_id=f'gcs_to_bq',
bucket=GCS_BUCKET,
source_objects=[f'{GCS_OBJECT_PATH}/{SOURCE_TABLE_NAME}.csv'],
destination_project_dataset_table='.'.join([BQ_PROJECT, BQ_DS, SOURCE_TABLE_NAME]),
schema_fields=schema,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
skip_leading_rows=1,
allow_quoted_newlines=True,
)
cleanup_task = GCSDeleteObjectsOperator(
task_id='cleanup',
bucket_name=GCS_BUCKET,
objects=[f'{GCS_OBJECT_PATH}/{SOURCE_TABLE_NAME}.csv'],
)
postgres_to_gcs_task >> gcs_to_bq_task >> cleanup_task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment