Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
class CrashLoopDetection:
def __init__(self, kubernetes_client=None):
self.kubernetes_client = kubernetes_client or client
def getpodstatus(self, pod):
return pod.status.phase
def run(self, **kwargs):
name = kwargs.get('name')
namespace = kwargs.get('namespace')
waitperiod = kwargs.get('waitperiod')
delay = kwargs.get('delay')
version = kwargs.get('version')
crashloopdetected = False
waittimeout = False
podlist = self.kubernetes_client.CoreV1Api().list_namespaced_pod(namespace= namespace, label_selector = "app={0},version={1}".format(name,version))
for item in podlist.items:
pod = self.kubernetes_client.CoreV1Api().read_namespaced_pod_status(namespace= namespace, name=item.metadata.name)
if (pod.status.container_statuses[0].restart_count > 0):
crashloopdetected = True
notify(message="Pod Status Waiting Reason: {0}".format(pod.status.container_statuses[0].state.waiting.reason))
break
elif (self.getpodstatus(pod) != "Running"):
current_time = datetime.now()
timeout = current_time + timedelta(seconds= waitperiod)
while (datetime.now() < timeout):
pod = self.kubernetes_client.CoreV1Api().read_namespaced_pod_status(
namespace=namespace, name=item.metadata.name)
notify(message="Pod {0} status: {1}".format(pod.metadata.name, pod.status.phase))
if (self.getpodstatus(pod) == "Running"):
waittimeout = False
crashloopdetected = False
notify(message="pods ready.")
break
else:
waittimeout = True
crashloopdetected = True
time.sleep(delay)
if (waittimeout == True):
return waittimeout
return (crashloopdetected == True or waittimeout == True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.