Last active
March 15, 2023 10:08
-
-
Save ddelange/a515999f5965dd92cbd0274517db0f53 to your computer and use it in GitHub Desktop.
kubetop - cluster resource monitoring
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# View node and container resource requests, limits, and usage, grouped per node,namespace,pod,container | |
# | |
# Installation: | |
# pip install mapply pandas kubernetes sh | |
# brew install watch kubectl coreutils | |
# | |
# Usage: | |
# alias kubetop='watch -n4 python ~/Downloads/kubetop.py' | |
# kubetop | |
import json | |
import os | |
import re | |
import subprocess | |
import numpy as np | |
import pandas as pd | |
from kubernetes.utils.quantity import parse_quantity | |
from sh import numfmt | |
from mapply.parallel import multiprocessing_imap | |
class Q(dict): | |
"""Dict with key access via attributes.""" | |
def __missing__(self, key): | |
value = self[key] = type(self)() # retain local pointer to value | |
return value # faster to return than dict lookup | |
def __getattr__(self, name: str): | |
return self[name] | |
def __setattr__(self, name: str, value): | |
self[name] = value | |
pd.set_option("display.max_rows", None) | |
pd.set_option("display.width", None) | |
pd.set_option("display.max_colwidth", None) | |
def print_node_top(): | |
nodes = subprocess.check_output( | |
"kubectl top node --sort-by memory", shell=True, encoding="utf-8" | |
).strip() | |
print(nodes) | |
return nodes | |
def get_nodes(): | |
nodes = subprocess.check_output( | |
"kubectl get nodes -L 'topology.kubernetes.io/zone' -L 'beta.kubernetes.io/arch' -L 'beta.kubernetes.io/instance-type' --no-headers=true", | |
shell=True, | |
encoding="utf-8", | |
).strip() | |
nodes = pd.DataFrame( | |
(re.split(" +", x.strip()) for x in nodes.split("\n")), | |
columns=[ | |
"node", | |
"status", | |
"roles", | |
"age", | |
"version", | |
"zone", | |
"arch", | |
"instance_type", | |
], | |
).set_index("node") | |
nodes["spot"] = nodes["roles"].str.contains("spot-worker") | |
return nodes | |
def get_allocatable(): | |
allocatable = ( | |
subprocess.check_output( | |
"kubectl get nodes -o jsonpath=\"{.items[*]['metadata.name', 'status.allocatable']}\"", | |
shell=True, | |
encoding="utf-8", | |
) | |
.strip() | |
.split(" ") | |
) | |
allocatable = dict(zip(*np.array_split(allocatable, 2))) | |
df = pd.Series(allocatable).apply(json.loads).apply(pd.Series) | |
df.index.name = "node" | |
return df | |
def get_top(): | |
top = subprocess.check_output( | |
"kubectl top pods --containers=true --all-namespaces --no-headers=true", | |
shell=True, | |
encoding="utf-8", | |
).strip() | |
top = pd.DataFrame(re.split(" +", x.strip()) for x in top.split("\n")) | |
top.columns = [ | |
"namespace", | |
"pod", | |
"container", | |
("cpu", "usage"), | |
("memory", "usage"), | |
] | |
top.reset_index(inplace=True, drop=True) | |
top.set_index(["pod", "namespace", "container"], inplace=True) | |
top.columns = pd.MultiIndex.from_tuples(top.columns) | |
return top | |
def get_resources(): | |
pods = subprocess.check_output( | |
"kubectl get pods --all-namespaces -o json", shell=True, encoding="utf-8" | |
).strip() | |
pods = json.loads(pods) | |
per_node = Q() | |
for pod in pods["items"]: | |
for container in pod["spec"]["containers"]: | |
if not container["resources"]: | |
container["resources"] = { | |
"limits": {"cpu": np.NaN}, | |
"requests": {"cpu": np.NaN}, | |
} | |
for volume in pod["spec"]["volumes"]: | |
if claim := volume.get("persistentVolumeClaim"): | |
name = volume["name"] | |
claim_name = claim["claimName"] | |
# match the pvc to the container mount | |
for container in pod["spec"]["containers"]: | |
for mount in container.get("volumeMounts", []): | |
if mount["name"] == name: | |
if container["resources"]["requests"].get("pvc"): | |
container["resources"]["requests"][ | |
"pvc" | |
] += f",{claim_name}" | |
else: | |
container["resources"]["requests"]["pvc"] = claim_name | |
data = { | |
container["name"]: container["resources"] | |
for container in pod["spec"]["containers"] | |
} | |
per_node[pod["spec"].get("nodeName")][pod["metadata"]["namespace"]][ | |
pod["metadata"]["name"] | |
] = data | |
resources = pd.json_normalize(per_node, sep="|").T | |
resources.index = pd.MultiIndex.from_tuples( | |
resources.index.str.split("|").tolist(), | |
names=["node", "namespace", "pod", "container", "", ""], | |
) | |
resources = resources.unstack().unstack()[0] | |
resources.drop(("pvc", "limits"), axis=1, inplace=True) | |
return resources | |
def merge(top, resources): | |
join = resources.join(top, how="outer") | |
join = join[sorted(join.columns)] | |
join = join.reorder_levels(["node", "namespace", "pod", "container"]).sort_index() | |
return join | |
top, resources, nodes, allocatable = multiprocessing_imap( | |
lambda x: x(), | |
[get_top, get_resources, get_nodes, get_allocatable], | |
progressbar=False, | |
n_workers=1, | |
) | |
join = merge(top, resources) | |
def _parse_quantity(x): | |
if x != x: | |
return np.NaN | |
if x[0].isdigit(): | |
return parse_quantity(x) | |
return x | |
node_totals = join.applymap(_parse_quantity).groupby("node").sum(0) | |
allocatable = allocatable.applymap(_parse_quantity) | |
allocatable.columns = pd.MultiIndex.from_tuples( | |
(x, "allocatable") for x in allocatable.columns | |
) | |
node_totals = node_totals.join(allocatable)[["cpu", "memory"]] | |
node_totals = node_totals[sorted(node_totals.columns)] | |
nodes.columns = pd.MultiIndex.from_tuples(("meta", x) for x in nodes.columns) | |
nodes = nodes.join(node_totals) | |
# most memory usage first | |
nodes.sort_values(("memory", "usage"), ascending=False, inplace=True) | |
# sort nodes on same sorting as nodes | |
join = join.loc[nodes.index] | |
nodes["memory"] = ( | |
nodes["memory"] | |
.applymap(int) | |
.applymap(str) | |
.apply(lambda x: numfmt("\n".join(x), to="iec-i", format="%.1f", field="-").split()) | |
) | |
# exclude namespaces that have 'system' in them | |
if os.environ.get("HIDESYS"): | |
join = join[ | |
~join.index.to_frame(index=False) | |
.namespace.str.contains("system|prometheus", regex=True) | |
.fillna(False) | |
.values | |
] | |
print(nodes) | |
print(join.fillna("")) | |
# all containers that do not have mem limits or mem limit != mem request | |
# join[join.memory.limits.isna() | (join.memory.limits != join.memory.requests)]['memory'].reset_index().fillna('').drop(columns='node').sort_values(['namespace', 'pod', 'container']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment