Skip to content

Instantly share code, notes, and snippets.

@vastus
Created May 2, 2019 09:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vastus/61a6dfd2a335d2dcf8e5e5cecbafdd27 to your computer and use it in GitHub Desktop.
Save vastus/61a6dfd2a335d2dcf8e5e5cecbafdd27 to your computer and use it in GitHub Desktop.
Airflow cluster policy
"""Local Airflow settings (policy)"""
from datetime import timedelta
from airflow.models import TaskInstance
DEFAULT_EXECUTION_TIMEOUT = timedelta(hours=48)
def policy(task: TaskInstance) -> None:
"""Alter tasks before they get run.
:param task: task instance that will be executed
"""
print("cluster policy got task", task) # debug
if hasattr(task, "execution_timeout") and not task.execution_timeout:
print("Setting execution timeout on task '{} ({})' to '{}'.".format(
task.task_id,
task.task_type,
DEFAULT_EXECUTION_TIMEOUT
))
task.execution_timeout = DEFAULT_EXECUTION_TIMEOUT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment