Skip to content

Instantly share code, notes, and snippets.

@nickefy
Created September 30, 2021 09:39
Show Gist options
  • Save nickefy/c87f780ecc69660c4ec2cb76d47ef6ac to your computer and use it in GitHub Desktop.
Save nickefy/c87f780ecc69660c4ec2cb76d47ef6ac to your computer and use it in GitHub Desktop.
from airflow import models
from airflow import DAG
from datetime import datetime, timedelta
from operators import DataSourceToCsv
from operators import CsvToBigquery
extract_query_source = """select * from transactions"""
default_dag_args = {
'start_date': datetime(2019, 5, 1, 7),
'email_on_failure': True,
'email_on_retry': True,
'project_id' : 'your_project_name',
'retries': 3,
'on_failure_callback': notify_email,
'retry_delay': timedelta(minutes=5),
}
with models.DAG(
dag_id='your_dag_name',
schedule_interval = timedelta(days=1),
catchup = True,
default_args=default_dag_args) as dag:
# Define Tasks
Extract = DataSourceToCsv.DataSourceToCsv(
task_id='Extract from Source',
table_name = 'source tablename',
extract_query = extract_query_source,
connection = 'your defined postgres db connection')
Load = CsvToBigquery.CsvToBigquery(
task_id='Load into Destination Table',
bigquery_table_name = 'destination tablename',
dataset_name = 'destination dataset name',
write_mode = 'WRITE_TRUNCATE, WRITE_APPEND OR WRITE_EMPTY')
# set dependencies and sequence
Extract >> Load
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment