Last active January 27, 2022 10:21
class CrashLoopDetection:
def __init__(self, kubernetes_client=None):
self.kubernetes_client = kubernetes_client or client
def getpodstatus(self, pod):
return pod.status.phase
def run(self, **kwargs):
name = kwargs.get('name')
namespace = kwargs.get('namespace')
waitperiod = kwargs.get('waitperiod')
delay = kwargs.get('delay')
version = kwargs.get('version')
crashloopdetected = False
waittimeout = False
podlist = self.kubernetes_client.CoreV1Api().list_namespaced_pod(namespace= namespace, label_selector = "app={0},version={1}".format(name,version))
for item in podlist.items:
pod = self.kubernetes_client.CoreV1Api().read_namespaced_pod_status(namespace= namespace,
if (pod.status.container_statuses[0].restart_count > 0):
crashloopdetected = True
notify(message="Pod Status Waiting Reason: {0}".format(pod.status.container_statuses[0].state.waiting.reason))
elif (self.getpodstatus(pod) != "Running"):
current_time =
timeout = current_time + timedelta(seconds= waitperiod)
while ( < timeout):
pod = self.kubernetes_client.CoreV1Api().read_namespaced_pod_status(
notify(message="Pod {0} status: {1}".format(, pod.status.phase))
if (self.getpodstatus(pod) == "Running"):
waittimeout = False
crashloopdetected = False
notify(message="pods ready.")
waittimeout = True
crashloopdetected = True
if (waittimeout == True):
return waittimeout
return (crashloopdetected == True or waittimeout == True)
