Skip to content

Instantly share code, notes, and snippets.

Samir Behara samirbehara

Block or report user

Report or block samirbehara

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View Kubernetes environment setup for MacOS
# Check for Homebrew and install if we don't have it
if test ! $(which brew); then
echo "Installing homebrew..."
ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
fi
# Make sure we are using the latest Homebrew.
brew update
# Check for Minikube and install if we don't have it
View CrashLoop Detection in Kubernetes.py
class CrashLoopDetection:
def __init__(self, kubernetes_client=None):
self.kubernetes_client = kubernetes_client or client
def getpodstatus(self, pod):
return pod.status.phase
def run(self, **kwargs):
name = kwargs.get('name')
View Test to list pods and services for specific namespace.py
def test_to_list_pods_for_specific_namespace(k8s_client):
ret = k8s_client.list_namespaced_pod("default")
podcount = len(ret.items)
print(podcount)
assert(podcount > 0)
# Iterate through all the pods in the default namespace and list down the pod name and respective namespace
for item in ret.items:
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace))
View Test to list nodes services endpoints pods for namespaces.py
def test_to_list_endpoints_for_all_namespaces(k8s_client):
ret = k8s_client.list_endpoints_for_all_namespaces()
epcount = len(ret.items)
print(epcount)
assert(epcount > 0)
# Iterate through all the endpoints in the K8s cluster and list it down
for item in ret.items:
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace))
View Test to list all namespaces.py
def test_to_list_all_namespaces(k8s_client):
ret = k8s_client.list_namespace()
nscount = len(ret.items)
print(nscount)
assert(nscount > 0)
# Iterate through all the namespaces in the K8s cluster and list it down
for item in ret.items:
print(item.metadata.name)
View Test to verify Pod and Service Count.py
def test_to_verify_pod_count_in_default_namespace(k8s_client):
podcount = len(k8s_client.list_namespaced_pod(namespace='default').items)
print(podcount)
assert(podcount > 0)
def test_to_verify_service_count_in_default_namespace(k8s_client):
svccount = len(k8s_client.list_namespaced_service(namespace='default').items)
View Client Setup for E2E Tests.py
from kubernetes import client, config
# Setup the client for E2E Tests
def k8s_client():
# Configuration for accessing the K8s Cluster
config.load_kube_config("./config")
# Create and return an instance of the CoreV1Api class
return client.CoreV1Api()
View Test K8s Pod Status is Running.py
def test_kubernetes_pod_status_is_running(k8s_client):
podlist = k8s_client.list_namespaced_pod("default")
# Iterate through all the pods in the default namespace and verify that they are Running
for item in podlist.items:
pod = k8s_client.read_namespaced_pod_status(namespace='default', name=item.metadata.name)
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace))
View Test K8s Node Status is Ready.py
def test_kubernetes_node_status_is_ready(k8s_client):
nodelist = k8s_client.list_node()
# Iterate through all the nodes and verify that they are Active
for item in nodelist.items:
node = k8s_client.read_node_status(name=item.metadata.name)
print("%s\t" % item.metadata.name)
View Test K8s Master Controller in Healthy.py
def test_kubernetes_master_controller_status_is_healthy(k8s_client):
ret = k8s_client.read_component_status('controller-manager')
assert(ret.conditions[0].type == "Healthy" ) # Verify status of Master Controller
You can’t perform that action at this time.