Skip to content

Instantly share code, notes, and snippets.

@achad4
Created April 6, 2022 22:14
Show Gist options
  • Save achad4/8311f9eba1888b51442fed632d7cf3e7 to your computer and use it in GitHub Desktop.
Save achad4/8311f9eba1888b51442fed632d7cf3e7 to your computer and use it in GitHub Desktop.
from airflow.models import Variable
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
MODEL_ENTRY_POINT = "customer_model.py"
# This dynamic configuration mechanism is not ideal but makes for an easy demo
model_version = Variable.get("model_version")
lookback_days_train = Variable.get("lookback_days_train")
lookback_days_score = Variable.get("lookback_days_score")
with DAG(
"unscalable_model_pipeline",
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='An example of a DAG that will be difficult to scale',
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 4, 7),
catchup=False,
) as dag:
train_model = KubernetesPodOperator(
name="train_model",
task_id="train_model",
image="my_model_image",
cmds=[
MODEL_ENTRY_POINT,
"train_model",
f"--lookback_days={lookback_days_train}"
],
)
score_customers = KubernetesPodOperator(
name="score_customers",
task_id="score_customers",
image="my_model_image",
cmds=[
MODEL_ENTRY_POINT,
"score_customers",
f"--model_version={model_version}",
f"--lookback_days={lookback_days_score}"
],
)
train_model >> score_customers
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment