Created
November 29, 2021 18:14
-
-
Save juangesino/4861e2bbbe42d60a3f8646d6eb42fadc to your computer and use it in GitHub Desktop.
Abstracting data loading with Airflow DAG factories
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Abstracting data loading with Airflow DAG factories |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with dag: | |
t1 = PythonOperator( | |
task_id="sync_data", python_callable=_sync_data, provide_context=True | |
) | |
return dag |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Clean the data | |
df = gsheets.clean_data( | |
data, | |
config["mapper"], | |
drop_duplicates=config["drop_duplicates"], | |
skip_index=config["skip_index"], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# DAG configuration | |
name: invoices | |
interval: "@daily" | |
owner: finance | |
retries: 3 | |
retry_delay_minutes: 15 | |
# Google Sheets configuration | |
file_name: "Invoices" | |
sheet_name: "invoices" | |
headers_row: 1 | |
# DB configuration | |
db_name: RAW | |
db_schema: GSHEETS | |
db_table: INVOICES | |
# Mapper configuration | |
drop_duplicates: false | |
if_exists: "replace" | |
mapper: | |
"ID": | |
name: id | |
dtype: text | |
"Amount": | |
name: amount | |
dtype: text |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function that takes the config and creates a DAG | |
def create_ghseets_dag(config): | |
# Define default arguments for DAG | |
default_args = { | |
"owner": config["owner"], | |
"start_date": datetime.combine(config["start_date"], time.min), | |
"retries": config["retries"], | |
"retry_delay": timedelta(minutes=config["retry_delay_minutes"]), | |
} | |
# Initialize the DAG | |
dag = DAG( | |
dag_id=config["nickname"], | |
default_args=default_args, | |
schedule_interval=config["interval"], | |
catchup=config["catchup"], | |
) | |
def _sync_data(**context): | |
# Get data from GSheets | |
data = gsheets.get_sheet( | |
config["file_name"], config["sheet_name"], headers_row=config["headers_row"] | |
) | |
# Clean the data | |
df = gsheets.clean_data( | |
data, | |
config["mapper"], | |
drop_duplicates=config["drop_duplicates"], | |
skip_index=config["skip_index"], | |
) | |
# Write to Snowflake | |
if len(df) > 0: | |
sf.load_df( | |
df=df, | |
table_name=config["db_table"], | |
database=config["db_name"], | |
schema=config["db_schema"], | |
if_exists=config["if_exists"], | |
) | |
return "OK" | |
with dag: | |
t1 = PythonOperator( | |
task_id="sync_data", python_callable=_sync_data, provide_context=True | |
) | |
return dag |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get data from GSheets | |
data = gsheets.get_sheet( | |
config["file_name"], config["sheet_name"], headers_row=config["headers_row"] | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Define default arguments for DAG | |
default_args = { | |
"owner": config["owner"], | |
"start_date": datetime.combine(config["start_date"], time.min), | |
"retries": config["retries"], | |
"retry_delay": timedelta(minutes=config["retry_delay_minutes"]), | |
} | |
# Initialize the DAG | |
dag = DAG( | |
dag_id=config["nickname"], | |
default_args=default_args, | |
schedule_interval=config["interval"], | |
catchup=config["catchup"], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for config_path in config_files: | |
# Read YAML file | |
with open(config_path, "r") as stream: | |
config = yaml.safe_load(stream) | |
# Add to global scope | |
globals()[config["nickname"]] = create_gsheets_dag(config) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mapper: | |
"Product Name": | |
name: product_name | |
dtype: text |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get gsheets configuration files | |
config_files = glob.glob("dags/gsheets/*.yaml") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Write to Snowflake | |
if len(df) > 0: | |
sf.load_df( | |
df=df, | |
table_name=config["db_table"], | |
database=config["db_name"], | |
schema=config["db_schema"], | |
if_exists=config["if_exists"], | |
) | |
return "OK" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment