-
-
Save ShopifyEng/28ba4d486ec1efd24dc3d1464cd45928 to your computer and use it in GitHub Desktop.
Lessons Learned From Running Apache Airflow at Scale - airflow_local_settings.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import List | |
import yaml | |
from airflow.exceptions import AirflowClusterPolicyViolation | |
from airflow.models import DAG | |
def validate_pools(dag: DAG, pools: List[str]) -> None: | |
for task in dag.tasks: | |
if task.pool not in pools: | |
raise AirflowClusterPolicyViolation( | |
f"DAG {dag.dag_id} cannot submit tasks to the pool: {task.pool}" | |
) | |
def validate_queues(dag: DAG, queues: List[str]) -> None: | |
for task in dag.tasks: | |
if task.queue not in queues: | |
raise AirflowClusterPolicyViolation( | |
f"DAG {dag.dag_id} cannot submit tasks to the queue: {task.queue}" | |
) | |
def dag_policy(dag: DAG) -> None: | |
airflow_home = os.environ.get('AIRFLOW_HOME', '~/airflow') | |
manifest_path = f"{airflow_home}/airflow_manifest.yaml" | |
with open(manifest_path, "r", encoding="UTF-8") as manifest_file: | |
manifest = yaml.safe_load(manifest_file) | |
dag_namespace = dag.dag_id.split(".")[0] | |
if dag_namespace not in manifest["projects"]: | |
raise AirflowClusterPolicyViolation( | |
f"Namespace {dag_namespace} is not registered in the Airflow Manifest." | |
) | |
constraints = manifest["projects"][dag_namespace]["constraints"] | |
validate_pools(dag, constraints["pools"]) | |
validate_queues(dag, constraints["queues"]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment