Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save juangesino/4861e2bbbe42d60a3f8646d6eb42fadc to your computer and use it in GitHub Desktop.
Save juangesino/4861e2bbbe42d60a3f8646d6eb42fadc to your computer and use it in GitHub Desktop.
Abstracting data loading with Airflow DAG factories
# Abstracting data loading with Airflow DAG factories
with dag:
t1 = PythonOperator(
task_id="sync_data", python_callable=_sync_data, provide_context=True
)
return dag
# Clean the data
df = gsheets.clean_data(
data,
config["mapper"],
drop_duplicates=config["drop_duplicates"],
skip_index=config["skip_index"],
)
# DAG configuration
name: invoices
interval: "@daily"
owner: finance
retries: 3
retry_delay_minutes: 15
# Google Sheets configuration
file_name: "Invoices"
sheet_name: "invoices"
headers_row: 1
# DB configuration
db_name: RAW
db_schema: GSHEETS
db_table: INVOICES
# Mapper configuration
drop_duplicates: false
if_exists: "replace"
mapper:
"ID":
name: id
dtype: text
"Amount":
name: amount
dtype: text
# Function that takes the config and creates a DAG
def create_ghseets_dag(config):
# Define default arguments for DAG
default_args = {
"owner": config["owner"],
"start_date": datetime.combine(config["start_date"], time.min),
"retries": config["retries"],
"retry_delay": timedelta(minutes=config["retry_delay_minutes"]),
}
# Initialize the DAG
dag = DAG(
dag_id=config["nickname"],
default_args=default_args,
schedule_interval=config["interval"],
catchup=config["catchup"],
)
def _sync_data(**context):
# Get data from GSheets
data = gsheets.get_sheet(
config["file_name"], config["sheet_name"], headers_row=config["headers_row"]
)
# Clean the data
df = gsheets.clean_data(
data,
config["mapper"],
drop_duplicates=config["drop_duplicates"],
skip_index=config["skip_index"],
)
# Write to Snowflake
if len(df) > 0:
sf.load_df(
df=df,
table_name=config["db_table"],
database=config["db_name"],
schema=config["db_schema"],
if_exists=config["if_exists"],
)
return "OK"
with dag:
t1 = PythonOperator(
task_id="sync_data", python_callable=_sync_data, provide_context=True
)
return dag
# Get data from GSheets
data = gsheets.get_sheet(
config["file_name"], config["sheet_name"], headers_row=config["headers_row"]
)
# Define default arguments for DAG
default_args = {
"owner": config["owner"],
"start_date": datetime.combine(config["start_date"], time.min),
"retries": config["retries"],
"retry_delay": timedelta(minutes=config["retry_delay_minutes"]),
}
# Initialize the DAG
dag = DAG(
dag_id=config["nickname"],
default_args=default_args,
schedule_interval=config["interval"],
catchup=config["catchup"],
)
for config_path in config_files:
# Read YAML file
with open(config_path, "r") as stream:
config = yaml.safe_load(stream)
# Add to global scope
globals()[config["nickname"]] = create_gsheets_dag(config)
mapper:
"Product Name":
name: product_name
dtype: text
# Get gsheets configuration files
config_files = glob.glob("dags/gsheets/*.yaml")
# Write to Snowflake
if len(df) > 0:
sf.load_df(
df=df,
table_name=config["db_table"],
database=config["db_name"],
schema=config["db_schema"],
if_exists=config["if_exists"],
)
return "OK"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment