Skip to content

Instantly share code, notes, and snippets.

@ShopifyEng
Created May 23, 2022 13:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ShopifyEng/28ba4d486ec1efd24dc3d1464cd45928 to your computer and use it in GitHub Desktop.
Save ShopifyEng/28ba4d486ec1efd24dc3d1464cd45928 to your computer and use it in GitHub Desktop.
Lessons Learned From Running Apache Airflow at Scale - airflow_local_settings.py
import os
from typing import List
import yaml
from airflow.exceptions import AirflowClusterPolicyViolation
from airflow.models import DAG
def validate_pools(dag: DAG, pools: List[str]) -> None:
for task in dag.tasks:
if task.pool not in pools:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} cannot submit tasks to the pool: {task.pool}"
)
def validate_queues(dag: DAG, queues: List[str]) -> None:
for task in dag.tasks:
if task.queue not in queues:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} cannot submit tasks to the queue: {task.queue}"
)
def dag_policy(dag: DAG) -> None:
airflow_home = os.environ.get('AIRFLOW_HOME', '~/airflow')
manifest_path = f"{airflow_home}/airflow_manifest.yaml"
with open(manifest_path, "r", encoding="UTF-8") as manifest_file:
manifest = yaml.safe_load(manifest_file)
dag_namespace = dag.dag_id.split(".")[0]
if dag_namespace not in manifest["projects"]:
raise AirflowClusterPolicyViolation(
f"Namespace {dag_namespace} is not registered in the Airflow Manifest."
)
constraints = manifest["projects"][dag_namespace]["constraints"]
validate_pools(dag, constraints["pools"])
validate_queues(dag, constraints["queues"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment