Skip to content

Instantly share code, notes, and snippets.

@robinedwards
Created September 5, 2018 14:49
Show Gist options
  • Save robinedwards/3c2f5fd09ea229a475d74405312a4f55 to your computer and use it in GitHub Desktop.
Save robinedwards/3c2f5fd09ea229a475d74405312a4f55 to your computer and use it in GitHub Desktop.
handling subdag retries
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
DAG_NAME = 'example_subdag_operator'
def raise_error(*args, **kwargs):
raise ValueError("SOME ERROR")
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
PythonOperator(
task_id='%s-task-raise-error' % (child_dag_name),
default_args=args,
dag=dag_subdag,
python_callable=raise_error
)
return dag_subdag
args = {
'owner': 'airflow',
'start_date': datetime(2018, 9, 5)
}
dag = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval="@once",
)
start = DummyOperator(
task_id='start',
default_args=args,
dag=dag,
)
section_1 = SubDagOperator(
task_id='section-1',
subdag=subdag(DAG_NAME, 'section-1', args),
default_args=args,
dag=dag,
retries=1
)
end = DummyOperator(
task_id='end',
default_args=args,
dag=dag,
)
start.set_downstream(section_1)
section_1.set_downstream(end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment