Skip to content

Instantly share code, notes, and snippets.

@achad4
Last active April 7, 2022 22:24
Show Gist options
  • Save achad4/3eaba321aac2d81ffbd2e70ca16dc1aa to your computer and use it in GitHub Desktop.
Save achad4/3eaba321aac2d81ffbd2e70ca16dc1aa to your computer and use it in GitHub Desktop.
Example of ML DAG using Python operator
from airflow.models import Variable
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def train_model(lookback_days=30: int):
"""
Trains a model using data from the past <lookback_days> and persists to a model store
"""
...
def score_customers(model_version=None: str, lookback_days=30: int):
"""
Pulls <model_version> from the model store and uses it to score production data for the last <lookback_days>
"""
...
# This dynamic configuration mechanism is not ideal but makes for an easy demo
model_version = Variable.get("model_version")
lookback_days_train = Variable.get("lookback_days_train")
lookback_days_score = Variable.get("lookback_days_score")
with DAG(
"unscalable_model_pipeline",
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='An example of a DAG that will be difficult to scale',
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 4, 7),
catchup=False,
) as dag:
train_model = PythonOperator(
task_id="train_model",
python_callable=train_model,
op_kwargs={"lookback_days": lookback_days_train},
dag=dag
)
score_customers = PythonOperator(
task_id="score_customers",
python_callable=train_model,
op_kwargs={
"model_version": model_version,
"lookback_days": lookback_days_score
},
dag=dag
)
train_model >> score_customers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment