Skip to content

Instantly share code, notes, and snippets.

@ealebed
Created May 22, 2021 07:32
Show Gist options
  • Save ealebed/f69a8ee4c42fce55df84eb34418ff362 to your computer and use it in GitHub Desktop.
Save ealebed/f69a8ee4c42fce55df84eb34418ff362 to your computer and use it in GitHub Desktop.
Trigger KubernetesPodOperator using HTTP call
import airflow
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
default_args = {
'owner': 'ealebed',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
}
dag = DAG(
dag_id='000_templated_task',
default_args=default_args,
schedule_interval=None,
tags=['test']
)
resources = k8s.V1ResourceRequirements(
requests={
'memory': '128Mi',
'cpu': 0.5,
'ephemeral-storage': '1Gi'
},
limits={
'memory': '128Mi',
'cpu': 0.5,
'nvidia.com/gpu': None,
'ephemeral-storage': '1Gi'
}
)
tt0 = KubernetesPodOperator(
namespace="airflow",
image="openjdk:{{ dag_run.conf.image_tag }}",
cmds=["java", "--version"],
name="templated-task",
labels={"app": "000-templated-task"},
task_id="templated-task-id",
service_account_name="airflow",
resources=resources,
startup_timeout_seconds=30,
get_logs=True,
is_delete_operator_pod=False,
in_cluster=True,
do_xcom_push=False,
dag=dag
)
import airflow
from airflow.models import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0),
}
dag = DAG(
dag_id='manual_tracer',
default_args=default_args,
schedule_interval=None,
tags=['manual']
)
resources = k8s.V1ResourceRequirements(
requests={
'memory': '128Mi',
'cpu': 0.5,
'ephemeral-storage': '1Gi'
},
limits={
'memory': '128Mi',
'cpu': 0.5,
'nvidia.com/gpu': None,
'ephemeral-storage': '1Gi'
}
)
manual_tracer = KubernetesPodOperator(
namespace="airflow",
image="openjdk:15",
cmds=["bin/bash", "-c", "sleep 5; seq ${COUNT} | xargs -I@ -n1 curl -s -i -X POST ${URL} -d ${JSON}"],
name="manual-tracer-task",
labels={"app": "manual-tracer-task"},
task_id="manual-tracer-task-id",
env_vars={
'COUNT': '{{ dag_run.conf.count }}',
'URL': '{{ dag_run.conf.url }}',
'JSON': '{{ dag_run.conf.json }}',
},
service_account_name="airflow",
resources=resources,
startup_timeout_seconds=30,
get_logs=True,
is_delete_operator_pod=False,
in_cluster=True,
do_xcom_push=False,
dag=dag
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment