Skip to content

Instantly share code, notes, and snippets.

@ealebed
Last active April 13, 2021 04:40
Show Gist options
  • Save ealebed/9df3f78158e14241843f740c5d4407e1 to your computer and use it in GitHub Desktop.
Save ealebed/9df3f78158e14241843f740c5d4407e1 to your computer and use it in GitHub Desktop.
Airflow_Kubernetes
FROM python:3.7-slim-stretch
LABEL maintainer="Yevhen Lebid <yevhen.lebid@loopme.com>"
# Never prompts the user for choices on installation/configuration of packages
ENV DEBIAN_FRONTEND=noninteractive \
TERM=linux
# Airflow
ARG AIRFLOW_VERSION=1.10.7
ARG AIRFLOW_USER_HOME=/usr/local/airflow
ENV AIRFLOW_HOME=${AIRFLOW_USER_HOME}
# Define en_US.
ENV LANGUAGE=en_US.UTF-8 \
LANG=en_US.UTF-8 \
LC_ALL=en_US.UTF-8 \
LC_CTYPE=en_US.UTF-8 \
LC_MESSAGES=en_US.UTF-8
RUN set -ex \
&& buildDeps=" \
freetds-dev \
libkrb5-dev \
libsasl2-dev \
libssl-dev \
libffi-dev \
libpq-dev \
git \
" \
&& pipDeps=" \
pytz \
pyOpenSSL \
ndg-httpsclient \
pyasn1 \
psycopg2-binary \
apache-airflow[crypto,postgres,jdbc,kubernetes,password,elasticsearch,slack]==${AIRFLOW_VERSION} \
" \
&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& apt-get install -yqq --no-install-recommends \
$buildDeps \
freetds-bin \
build-essential \
apt-utils \
curl \
rsync \
netcat \
locales \
&& sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \
&& locale-gen \
&& update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \
&& useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \
&& pip install -U pip setuptools wheel \
&& pip install $pipDeps \
&& apt-get purge --auto-remove -yqq $buildDeps \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf \
/var/lib/apt/lists/* \
/tmp/* \
/var/tmp/* \
/usr/share/man \
/usr/share/doc \
/usr/share/doc-base
COPY entrypoint.sh /entrypoint.sh
COPY airflow.cfg ${AIRFLOW_USER_HOME}/airflow.cfg
RUN chown -R airflow: ${AIRFLOW_USER_HOME}
EXPOSE 8080
USER airflow
WORKDIR ${AIRFLOW_USER_HOME}
ENTRYPOINT ["/entrypoint.sh"]
CMD ["webserver"]
#!/usr/bin/env bash
declare -a DEFAULT_CONNS=(
"airflow_db"
"slack"
"cassandra_default"
"azure_container_instances_default"
"azure_cosmos_default"
"azure_data_lake_default"
"segment_default"
"dingding_default"
"qubole_default"
"databricks_default"
"emr_default"
"sqoop_default"
"redis_default"
"druid_ingest_default"
"druid_broker_default"
"spark_default"
"aws_default"
"fs_default"
"sftp_default"
"ssh_default"
"webhdfs_default"
"wasb_default"
"vertica_default"
"local_mysql"
"mssql_default"
"http_default"
"sqlite_default"
"postgres_default"
"mysql_default"
"mongo_default"
"metastore_default"
"hiveserver2_default"
"hive_cli_default"
"opsgenie_default"
"google_cloud_default"
"presto_default"
"bigquery_default"
"beeline_default"
"pig_cli_default"
)
case "$1" in
webserver)
airflow initdb
airflow create_user \
--role Admin \
--username ${AIRFLOW_ADMIN_USER} \
--password ${AIRFLOW_ADMIN_PASSWORD} \
--firstname Air \
--lastname Flow \
--email air.flow@examle.com
for CONN in "${DEFAULT_CONNS[@]}"
do
airflow connections --delete --conn_id ${CONN}
done
airflow connections \
--add \
--conn_id postgres_default \
--conn_uri ${AIRFLOW_CONN_POSTGRES_DEFAULT}
airflow connections \
--add \
--conn_id slack \
--conn_type http \
--conn_host https://hooks.slack.com/services \
--conn_password ${AIRFLOW_SLACK_WEBHOOK_URL}
if [ "$AIRFLOW__CORE__EXECUTOR" = "KubernetesExecutor" ]; then
# With the "KubernetesExecutor" executors it should all run in one container.
airflow scheduler &
fi
if [ "$AIRFLOW__CORE__EXECUTOR" = "LocalExecutor" ]; then
# With the "Local" executor it should all run in one container.
airflow scheduler &
fi
exec airflow worker &
exec airflow webserver
;;
worker|scheduler)
# To give the webserver time to run initdb.
sleep 10
exec airflow "$@"
;;
version)
exec airflow "$@"
;;
*)
# The command is something like bash, not an airflow subcommand. Just run it in the right environment.
exec "$@"
;;
esac
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: airflow-role-admin
rules:
- apiGroups:
- ""
resources:
- pods
- pods/log
verbs:
- get
- watch
- list
- create
- delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: airflow-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: airflow-role-admin
subjects:
- kind: ServiceAccount
name: default
---
apiVersion: v1
data:
airflow.cfg: |
[core]
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
remote_logging = False
remote_log_conn_id =
remote_base_log_folder =
encrypt_s3_logs = False
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = LocalExecutor
# sql_alchemy_conn = AIRFLOW__CORE__SQL_ALCHEMY_CONN from manifest
sql_alchemy_conn =
load_examples = False
# fernet_key = AIRFLOW__CORE__FERNET_KEY
fernet_key =
[cli]
api_client = airflow.api.client.local_client
endpoint_url = http://my-airflow.example.cool
[api]
auth_backend = airflow.api.auth.backend.default
[webserver]
base_url = http://my-airflow.example.cool
web_server_host = 0.0.0.0
web_server_port = 8080
# Set to true to turn on authentication:
# https://airflow.apache.org/security.html#web-authentication
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
# Use FAB-based webserver with RBAC feature
rbac = True
expose_config = True
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
run_duration = -1
min_file_process_interval = 5
dag_dir_list_interval = 300
print_stats_interval = 30
scheduler_health_check_threshold = 30
child_process_log_directory = /usr/local/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 512
max_threads = 2
authenticate = False
use_job_schedule = True
[admin]
hide_sensitive_variable_fields = True
known_hosts: |
github.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkccKrpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFzLQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaSjB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3skua2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ==
kind: ConfigMap
metadata:
name: airflow-configmap
---
apiVersion: v1
data:
gitSshKey: <real_ssh_key_in_base64>
kind: Secret
metadata:
name: airflow-secrets
type: Opaque
---
apiVersion: v1
kind: Service
metadata:
name: airflow
spec:
clusterIP: None
ports:
- name: http
port: 8080
selector:
app: airflow
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
spec:
replicas: 1
revisionHistoryLimit: 1
selector:
matchLabels:
app: airflow
strategy:
type: Recreate
template:
metadata:
labels:
app: airflow
spec:
containers:
- env:
- name: GIT_SYNC_REPO
value: git@github.com:ealebed/airflow.git
- name: GIT_SYNC_BRANCH
value: master
- name: GIT_SYNC_ROOT
value: /git
- name: GIT_SYNC_DEST
value: repo
- name: GIT_SYNC_SSH
value: "true"
image: k8s.gcr.io/git-sync:v3.1.4
name: git-sync
securityContext:
runAsUser: 65533
volumeMounts:
- mountPath: /git
name: airflow-dags
- mountPath: /etc/git-secret/ssh
name: airflow-secrets
subPath: ssh
- mountPath: /etc/git-secret/known_hosts
name: airflow-configmap
subPath: known_hosts
- env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql+psycopg2://airflow:airflow@postgreshost:5432/airflow
- name: AIRFLOW_CONN_POSTGRES_DEFAULT
value: postgres://airflow:airflow@postgreshost:5432/airflow
- name: AIRFLOW_SLACK_WEBHOOK_URL
value: T02H6C..........q3QPW0m
- name: AIRFLOW_ADMIN_USER
value: airflow
- name: AIRFLOW_ADMIN_PASSWORD
value: airflow
- name: AIRFLOW__CORE__FERNET_KEY
value: tsJjtESQbN_24ADlMX2HISyIVwfj7pW1nEfYDkcPYMY=
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: index.docker.io/ealebed/airflow:1.10.7
livenessProbe:
failureThreshold: 5
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
timeoutSeconds: 5
name: airflow
ports:
- containerPort: 8080
name: http
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 60
periodSeconds: 5
timeoutSeconds: 5
resources:
limits:
cpu: "2"
memory: 4Gi
requests:
cpu: "2"
memory: 4Gi
volumeMounts:
- mountPath: /usr/local/airflow/airflow.cfg
name: airflow-configmap
subPath: airflow.cfg
- mountPath: /usr/local/airflow/dags
name: airflow-dags
securityContext:
fsGroup: 1000
volumes:
- emptyDir: {}
name: airflow-dags
- configMap:
name: airflow-configmap
name: airflow-configmap
- name: airflow-secrets
secret:
defaultMode: 288
items:
- key: gitSshKey
mode: 288
path: ssh
secretName: airflow-secrets
from functools import partial
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from repo.dags.notifications import task_fail_slack_alert, task_success_slack_alert
from repo.dags.kubernetes_commons import my_affinity, my_tolerations, my_resources
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.strptime('2020.01.28', '%Y.%m.%d'),
'retry_delay': timedelta(minutes=5),
'on_failure_callback': partial(task_fail_slack_alert, usr="ealebed"),
'on_success_callback': partial(task_success_slack_alert, usr="ealebed"),
}
dag = DAG(
dag_id='test_creatives_task',
default_args=default_args,
max_active_runs=1,
schedule_interval="27,57 * * * *"
)
task = KubernetesPodOperator(
namespace="default",
image="ealebed/java:11",
cmds=["java", "--version"],
name="test-task",
labels={"app": "test-creatives-task"},
task_id="id-task",
affinity=my_affinity,
resources=my_resources,
tolerations=my_tolerations,
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=30,
get_logs=True,
is_delete_operator_pod=False,
hostnetwork=False,
in_cluster=True,
do_xcom_push=False,
dag=dag
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment