Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
def test_kubernetes_node_status_is_ready(k8s_client):
nodelist = k8s_client.list_node()
# Iterate through all the nodes and verify that they are Active
for item in nodelist.items:
node = k8s_client.read_node_status(name=item.metadata.name)
print("%s\t" % item.metadata.name)
assert(node.status.conditions[0].status == "False" ) # Verify if kubelet is OutOfDisk
assert(node.status.conditions[1].status == "False" ) # Verify if kubelet has MemoryPressure
assert(node.status.conditions[2].status == "False" ) # Verify if kubelet has DiskPressure
assert(node.status.conditions[3].type == "Ready" ) # Verify kubelet is posting ready status
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment