Skip to content

Instantly share code, notes, and snippets.

@naturalett
Created June 5, 2023 21:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save naturalett/98e6215b542e219ba9cf20dd0957eb09 to your computer and use it in GitHub Desktop.
Save naturalett/98e6215b542e219ba9cf20dd0957eb09 to your computer and use it in GitHub Desktop.
Airflow running bash tasks on k8s
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
NUM_TASKS = 500
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['admin@admin.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
tolerations = [
{
'key': "dedicated",
'operator': 'Equal',
'value': 'k8s-pod-operator-staging'
}
]
affinity = {
'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution':
{
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "kops.k8s.io/instancegroup",
"operator": "In",
"values": ["k8s-pod-operator-staging"]
}]
}]
}
}
}
with DAG(
'bash_tasks_on_k8s',
default_args=default_args,
start_date=datetime(2021, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
for i in range(0, NUM_TASKS):
KubernetesPodOperator(
task_id=f'bash_tasks_on_k8s_{i}',
name='kubernetes-pod-operator-bash-test',
namespace='datainfra-airflow-playground',
is_delete_operator_pod=True,
tolerations=tolerations,
affinity=affinity,
image_pull_policy= "IfNotPresent",
cmds=['date'],
image='protokube:1.18.2'
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment