Skip to content

Instantly share code, notes, and snippets.

Avatar

Samir Behara samirbehara

View GitHub Profile
View Kubernetes environment setup for MacOS
# Check for Homebrew and install if we don't have it
if test ! $(which brew); then
echo "Installing homebrew..."
ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
fi
# Make sure we are using the latest Homebrew.
brew update
# Check for Minikube and install if we don't have it
View CrashLoop Detection in Kubernetes.py
class CrashLoopDetection:
def __init__(self, kubernetes_client=None):
self.kubernetes_client = kubernetes_client or client
def getpodstatus(self, pod):
return pod.status.phase
def run(self, **kwargs):
name = kwargs.get('name')
View Test to list pods and services for specific namespace.py
def test_to_list_pods_for_specific_namespace(k8s_client):
ret = k8s_client.list_namespaced_pod("default")
podcount = len(ret.items)
print(podcount)
assert(podcount > 0)
# Iterate through all the pods in the default namespace and list down the pod name and respective namespace
for item in ret.items:
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace))
View Test to list nodes services endpoints pods for namespaces.py
def test_to_list_endpoints_for_all_namespaces(k8s_client):
ret = k8s_client.list_endpoints_for_all_namespaces()
epcount = len(ret.items)
print(epcount)
assert(epcount > 0)
# Iterate through all the endpoints in the K8s cluster and list it down
for item in ret.items:
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace))
View Test to list all namespaces.py
def test_to_list_all_namespaces(k8s_client):
ret = k8s_client.list_namespace()
nscount = len(ret.items)
print(nscount)
assert(nscount > 0)
# Iterate through all the namespaces in the K8s cluster and list it down
for item in ret.items:
print(item.metadata.name)
View Test to verify Pod and Service Count.py
def test_to_verify_pod_count_in_default_namespace(k8s_client):
podcount = len(k8s_client.list_namespaced_pod(namespace='default').items)
print(podcount)
assert(podcount > 0)
def test_to_verify_service_count_in_default_namespace(k8s_client):
svccount = len(k8s_client.list_namespaced_service(namespace='default').items)
View Client Setup for E2E Tests.py
from kubernetes import client, config
# Setup the client for E2E Tests
def k8s_client():
# Configuration for accessing the K8s Cluster
config.load_kube_config("./config")
# Create and return an instance of the CoreV1Api class
return client.CoreV1Api()
View Test K8s Pod Status is Running.py
def test_kubernetes_pod_status_is_running(k8s_client):
podlist = k8s_client.list_namespaced_pod("default")
# Iterate through all the pods in the default namespace and verify that they are Running
for item in podlist.items:
pod = k8s_client.read_namespaced_pod_status(namespace='default', name=item.metadata.name)
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace))
View Test K8s Node Status is Ready.py
def test_kubernetes_node_status_is_ready(k8s_client):
nodelist = k8s_client.list_node()
# Iterate through all the nodes and verify that they are Active
for item in nodelist.items:
node = k8s_client.read_node_status(name=item.metadata.name)
print("%s\t" % item.metadata.name)
View Test K8s Master Controller in Healthy.py
def test_kubernetes_master_controller_status_is_healthy(k8s_client):
ret = k8s_client.read_component_status('controller-manager')
assert(ret.conditions[0].type == "Healthy" ) # Verify status of Master Controller
You can’t perform that action at this time.