Skip to content

Instantly share code, notes, and snippets.

@ptran32
Created August 21, 2023 02:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ptran32/29c3bbc70a85e6a77b6e60d066dcbc53 to your computer and use it in GitHub Desktop.
Save ptran32/29c3bbc70a85e6a77b6e60d066dcbc53 to your computer and use it in GitHub Desktop.
k describe pod scheduler-666d75bd49-x98jx  ✔  metric-descriptor   jd-dev/airflow-batchingestion ⎈
Name: scheduler-666d75bd49-x98jx
Namespace: airflow-batchingestion
Priority: 0
Node: gke-jd-dev-base-202306281033560745000-772548ad-xv4g/10.151.196.52
Start Time: Sun, 20 Aug 2023 09:04:55 -0400
Labels: app=batchingestion
app.kubernetes.io/instance=airflow-batchingestion-dev
app.kubernetes.io/managed-by=Helm
app.kubernetes.io/name=airflow
component=scheduler
dept=data-platforms
env=just-data-sandbox
envtype=dev
feature=batchingestion
helm.sh/chart=airflow-1.0.0
owner=data-platforms-batchingestion
pod-template-hash=666d75bd49
system=airflow
Annotations: checksum/config: fd9132be2784dd150c111ac47d8a41cd7c7b14180f6a08d759b79d86f563a203
kubectl.kubernetes.io/restartedAt: 2023-08-17T21:23:50Z
Status: Running
IP: 10.X.X.X
IPs:
IP: 10.X.X.X
Controlled By: ReplicaSet/scheduler-666d75bd49
Init Containers:
wait-for-airflow-migrations:
Container ID: containerd://3a62352797824fbbbaea66389d56c1df13a84b6dad782df79241172ec460df64
Image: eu.gcr.io/xxx/airflow:2.6.0-python3.8-1
Image ID: eu.gcr.io/xxx/airflow@sha256:b4af37617625b8222d58fa748b0eea16d8afafeaf101ebfd21d2ae2dc169e35c
Port: <none>
Host Port: <none>
Args:
db
check-migrations
State: Terminated
Reason: Completed
Exit Code: 0
Started: Sun, 20 Aug 2023 09:05:23 -0400
Finished: Sun, 20 Aug 2023 09:05:35 -0400
Ready: True
Restart Count: 0
Environment Variables from:
airflow-config ConfigMap Optional: false
Environment:
AIRFLOW__CORE__FERNET_KEY: <set to the key 'fernet-key' in secret 'fernet-key'> Optional: false
AIRFLOW__CORE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-postgres'> Optional: false
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-postgres'> Optional: false
POSTGRES_STATS: <set to the key 'stats' in secret 'airflow-postgres'> Optional: false
AIRFLOW__SMTP__SMTP_PASSWORD: <set to the key 'smtp-password' in secret 'airflow-smtp'> Optional: false
Mounts:
/var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-gq4mp (ro)
Containers:
airflow-scheduler:
Container ID: containerd://a96304508539aa23127190b9334502df426706714fcca8ed60349d919e2106b8
Image: eu.gcr.io/xxx/airflow:2.6.0-python3.8-1
Image ID: eu.gcr.io/xxx/airflow@sha256:b4af37617625b8222d58fa748b0eea16d8afafeaf101ebfd21d2ae2dc169e35c
Port: <none>
Host Port: <none>
Args:
scheduler
State: Running
Started: Sun, 20 Aug 2023 09:05:42 -0400
Ready: True
Restart Count: 0
Limits:
cpu: 2
memory: 4Gi
Requests:
cpu: 1
memory: 1Gi
Liveness: exec [bash -c version=`airflow version`
echo "Current airflow version: ${version}"
if [[ ! "$version" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo "Invalid version format. Expecting semver format: X.Y.Z"
exit 1
fi
# Split the version into major, minor, and patch components
IFS='.' read -r major minor patch <<< "$version"
if (( major > 2 || (major == 2 && minor >= 5) || (major == 2 && minor == 5 && patch >= 0) )); then
echo "$version is greater than or equal to 2.5.0."
CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
airflow jobs check --job-type SchedulerJob --local
elif (( major > 2 || (major == 2 && minor >= 1) || (major == 2 && minor == 1 && patch >= 0) )); then
echo "$version is greater than or equal to 2.1.0."
CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \
airflow jobs check --job-type SchedulerJob --hostname $(hostname)
else
echo "$version is less than 2.1.0."
CONNECTION_CHECK_MAX_COUNT=0 exec /entrypoint python -Wignore -c "
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
from airflow.utils.state import State
from typing import List
with create_session() as session:
hostname = get_hostname()
query = session \
.query(SchedulerJob) \
.filter_by(state=State.RUNNING, hostname=hostname) \
.order_by(SchedulerJob.latest_heartbeat.desc())
jobs: List[SchedulerJob] = query.all()
alive_jobs = [job for job in jobs if job.is_alive()]
count_alive_jobs = len(alive_jobs)
if count_alive_jobs == 1:
print(f'HEALTHY - {count_alive_jobs} alive SchedulerJob for: {hostname}')
elif count_alive_jobs == 0:
SystemExit(f'UNHEALTHY - 0 alive SchedulerJob for: {hostname}')
else:
SystemExit(f'UNHEALTHY - {count_alive_jobs} (more than 1) alive SchedulerJob for: {hostname}')
"
fi
] delay=10s timeout=10s period=30s #success=1 #failure=5
Environment Variables from:
airflow-config ConfigMap Optional: false
Environment:
AIRFLOW__CORE__FERNET_KEY: <set to the key 'fernet-key' in secret 'fernet-key'> Optional: false
AIRFLOW__CORE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-postgres'> Optional: false
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'airflow-postgres'> Optional: false
POSTGRES_STATS: <set to the key 'stats' in secret 'airflow-postgres'> Optional: false
AIRFLOW__SMTP__SMTP_PASSWORD: <set to the key 'smtp-password' in secret 'airflow-smtp'> Optional: false
Mounts:
/etc/airflow/k8s_pod_template.yaml from k8s-pod-template-default (rw,path="k8s_pod_template.yaml")
/opt/airflow from airflow-home (rw,path="dags")
/opt/airflow/logs from airflow-home (rw,path="logs")
/var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-gq4mp (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
airflow-home:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-batchingestion-dev-nfs-pvc
ReadOnly: false
k8s-pod-template-default:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: k8s-pod-template
Optional: false
kube-api-access-gq4mp:
Type: Projected (a volume that contains injected data from multiple sources)
TokenExpirationSeconds: 3607
ConfigMapName: kube-root-ca.crt
ConfigMapOptional: <nil>
DownwardAPI: true
QoS Class: Burstable
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning Unhealthy 2m17s (x1283 over 13h) kubelet Liveness probe errored: command "bash -c version=`airflow version`\necho \"Current airflow version: ${version}\"\n\nif [[ ! \"$version\" =~ ^[0-9]+\\.[0-9]+\\.[0-9]+$ ]]; then\n echo \"Invalid version format. Expecting semver format: X.Y.Z\"\n exit 1\nfi\n\n# Split the version into major, minor, and patch components\nIFS='.' read -r major minor patch <<< \"$version\"\nif (( major > 2 || (major == 2 && minor >= 5) || (major == 2 && minor == 5 && patch >= 0) )); then\n echo \"$version is greater than or equal to 2.5.0.\"\n CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \\\n airflow jobs check --job-type SchedulerJob --local\nelif (( major > 2 || (major == 2 && minor >= 1) || (major == 2 && minor == 1 && patch >= 0) )); then\n echo \"$version is greater than or equal to 2.1.0.\"\n CONNECTION_CHECK_MAX_COUNT=0 AIRFLOW__LOGGING__LOGGING_LEVEL=ERROR exec /entrypoint \\\n airflow jobs check --job-type SchedulerJob --hostname $(hostname)\nelse\n echo \"$version is less than 2.1.0.\"\n CONNECTION_CHECK_MAX_COUNT=0 exec /entrypoint python -Wignore -c \"\nfrom airflow.jobs.scheduler_job import SchedulerJob\nfrom airflow.utils.db import create_session\nfrom airflow.utils.net import get_hostname\nfrom airflow.utils.state import State\nfrom typing import List\n\nwith create_session() as session:\n hostname = get_hostname()\n query = session \\\n .query(SchedulerJob) \\\n .filter_by(state=State.RUNNING, hostname=hostname) \\\n .order_by(SchedulerJob.latest_heartbeat.desc())\n jobs: List[SchedulerJob] = query.all()\n alive_jobs = [job for job in jobs if job.is_alive()]\n count_alive_jobs = len(alive_jobs)\n\nif count_alive_jobs == 1:\n print(f'HEALTHY - {count_alive_jobs} alive SchedulerJob for: {hostname}')\nelif count_alive_jobs == 0:\n SystemExit(f'UNHEALTHY - 0 alive SchedulerJob for: {hostname}')\nelse:\n SystemExit(f'UNHEALTHY - {count_alive_jobs} (more than 1) alive SchedulerJob for: {hostname}')\n\"\nfi\n" timed out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment