This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Check for Homebrew and install if we don't have it | |
if test ! $(which brew); then | |
echo "Installing homebrew..." | |
ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" | |
fi | |
# Make sure we are using the latest Homebrew. | |
brew update | |
# Check for Minikube and install if we don't have it |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CrashLoopDetection: | |
def __init__(self, kubernetes_client=None): | |
self.kubernetes_client = kubernetes_client or client | |
def getpodstatus(self, pod): | |
return pod.status.phase | |
def run(self, **kwargs): | |
name = kwargs.get('name') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_to_list_pods_for_specific_namespace(k8s_client): | |
ret = k8s_client.list_namespaced_pod("default") | |
podcount = len(ret.items) | |
print(podcount) | |
assert(podcount > 0) | |
# Iterate through all the pods in the default namespace and list down the pod name and respective namespace | |
for item in ret.items: | |
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_to_list_endpoints_for_all_namespaces(k8s_client): | |
ret = k8s_client.list_endpoints_for_all_namespaces() | |
epcount = len(ret.items) | |
print(epcount) | |
assert(epcount > 0) | |
# Iterate through all the endpoints in the K8s cluster and list it down | |
for item in ret.items: | |
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_to_list_all_namespaces(k8s_client): | |
ret = k8s_client.list_namespace() | |
nscount = len(ret.items) | |
print(nscount) | |
assert(nscount > 0) | |
# Iterate through all the namespaces in the K8s cluster and list it down | |
for item in ret.items: | |
print(item.metadata.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_to_verify_pod_count_in_default_namespace(k8s_client): | |
podcount = len(k8s_client.list_namespaced_pod(namespace='default').items) | |
print(podcount) | |
assert(podcount > 0) | |
def test_to_verify_service_count_in_default_namespace(k8s_client): | |
svccount = len(k8s_client.list_namespaced_service(namespace='default').items) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kubernetes import client, config | |
# Setup the client for E2E Tests | |
def k8s_client(): | |
# Configuration for accessing the K8s Cluster | |
config.load_kube_config("./config") | |
# Create and return an instance of the CoreV1Api class | |
return client.CoreV1Api() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_kubernetes_pod_status_is_running(k8s_client): | |
podlist = k8s_client.list_namespaced_pod("default") | |
# Iterate through all the pods in the default namespace and verify that they are Running | |
for item in podlist.items: | |
pod = k8s_client.read_namespaced_pod_status(namespace='default', name=item.metadata.name) | |
print("%s\t%s\t" % (item.metadata.name, item.metadata.namespace)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_kubernetes_node_status_is_ready(k8s_client): | |
nodelist = k8s_client.list_node() | |
# Iterate through all the nodes and verify that they are Active | |
for item in nodelist.items: | |
node = k8s_client.read_node_status(name=item.metadata.name) | |
print("%s\t" % item.metadata.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_kubernetes_master_controller_status_is_healthy(k8s_client): | |
ret = k8s_client.read_component_status('controller-manager') | |
assert(ret.conditions[0].type == "Healthy" ) # Verify status of Master Controller | |
NewerOlder