Skip to content

Instantly share code, notes, and snippets.

@ddelange
Last active March 15, 2023 10:08
Show Gist options
  • Save ddelange/a515999f5965dd92cbd0274517db0f53 to your computer and use it in GitHub Desktop.
Save ddelange/a515999f5965dd92cbd0274517db0f53 to your computer and use it in GitHub Desktop.
kubetop - cluster resource monitoring
# View node and container resource requests, limits, and usage, grouped per node,namespace,pod,container
#
# Installation:
# pip install mapply pandas kubernetes sh
# brew install watch kubectl coreutils
#
# Usage:
# alias kubetop='watch -n4 python ~/Downloads/kubetop.py'
# kubetop
import json
import os
import re
import subprocess
import numpy as np
import pandas as pd
from kubernetes.utils.quantity import parse_quantity
from sh import numfmt
from mapply.parallel import multiprocessing_imap
class Q(dict):
"""Dict with key access via attributes."""
def __missing__(self, key):
value = self[key] = type(self)() # retain local pointer to value
return value # faster to return than dict lookup
def __getattr__(self, name: str):
return self[name]
def __setattr__(self, name: str, value):
self[name] = value
pd.set_option("display.max_rows", None)
pd.set_option("display.width", None)
pd.set_option("display.max_colwidth", None)
def print_node_top():
nodes = subprocess.check_output(
"kubectl top node --sort-by memory", shell=True, encoding="utf-8"
).strip()
print(nodes)
return nodes
def get_nodes():
nodes = subprocess.check_output(
"kubectl get nodes -L 'topology.kubernetes.io/zone' -L 'beta.kubernetes.io/arch' -L 'beta.kubernetes.io/instance-type' --no-headers=true",
shell=True,
encoding="utf-8",
).strip()
nodes = pd.DataFrame(
(re.split(" +", x.strip()) for x in nodes.split("\n")),
columns=[
"node",
"status",
"roles",
"age",
"version",
"zone",
"arch",
"instance_type",
],
).set_index("node")
nodes["spot"] = nodes["roles"].str.contains("spot-worker")
return nodes
def get_allocatable():
allocatable = (
subprocess.check_output(
"kubectl get nodes -o jsonpath=\"{.items[*]['metadata.name', 'status.allocatable']}\"",
shell=True,
encoding="utf-8",
)
.strip()
.split(" ")
)
allocatable = dict(zip(*np.array_split(allocatable, 2)))
df = pd.Series(allocatable).apply(json.loads).apply(pd.Series)
df.index.name = "node"
return df
def get_top():
top = subprocess.check_output(
"kubectl top pods --containers=true --all-namespaces --no-headers=true",
shell=True,
encoding="utf-8",
).strip()
top = pd.DataFrame(re.split(" +", x.strip()) for x in top.split("\n"))
top.columns = [
"namespace",
"pod",
"container",
("cpu", "usage"),
("memory", "usage"),
]
top.reset_index(inplace=True, drop=True)
top.set_index(["pod", "namespace", "container"], inplace=True)
top.columns = pd.MultiIndex.from_tuples(top.columns)
return top
def get_resources():
pods = subprocess.check_output(
"kubectl get pods --all-namespaces -o json", shell=True, encoding="utf-8"
).strip()
pods = json.loads(pods)
per_node = Q()
for pod in pods["items"]:
for container in pod["spec"]["containers"]:
if not container["resources"]:
container["resources"] = {
"limits": {"cpu": np.NaN},
"requests": {"cpu": np.NaN},
}
for volume in pod["spec"]["volumes"]:
if claim := volume.get("persistentVolumeClaim"):
name = volume["name"]
claim_name = claim["claimName"]
# match the pvc to the container mount
for container in pod["spec"]["containers"]:
for mount in container.get("volumeMounts", []):
if mount["name"] == name:
if container["resources"]["requests"].get("pvc"):
container["resources"]["requests"][
"pvc"
] += f",{claim_name}"
else:
container["resources"]["requests"]["pvc"] = claim_name
data = {
container["name"]: container["resources"]
for container in pod["spec"]["containers"]
}
per_node[pod["spec"].get("nodeName")][pod["metadata"]["namespace"]][
pod["metadata"]["name"]
] = data
resources = pd.json_normalize(per_node, sep="|").T
resources.index = pd.MultiIndex.from_tuples(
resources.index.str.split("|").tolist(),
names=["node", "namespace", "pod", "container", "", ""],
)
resources = resources.unstack().unstack()[0]
resources.drop(("pvc", "limits"), axis=1, inplace=True)
return resources
def merge(top, resources):
join = resources.join(top, how="outer")
join = join[sorted(join.columns)]
join = join.reorder_levels(["node", "namespace", "pod", "container"]).sort_index()
return join
top, resources, nodes, allocatable = multiprocessing_imap(
lambda x: x(),
[get_top, get_resources, get_nodes, get_allocatable],
progressbar=False,
n_workers=1,
)
join = merge(top, resources)
def _parse_quantity(x):
if x != x:
return np.NaN
if x[0].isdigit():
return parse_quantity(x)
return x
node_totals = join.applymap(_parse_quantity).groupby("node").sum(0)
allocatable = allocatable.applymap(_parse_quantity)
allocatable.columns = pd.MultiIndex.from_tuples(
(x, "allocatable") for x in allocatable.columns
)
node_totals = node_totals.join(allocatable)[["cpu", "memory"]]
node_totals = node_totals[sorted(node_totals.columns)]
nodes.columns = pd.MultiIndex.from_tuples(("meta", x) for x in nodes.columns)
nodes = nodes.join(node_totals)
# most memory usage first
nodes.sort_values(("memory", "usage"), ascending=False, inplace=True)
# sort nodes on same sorting as nodes
join = join.loc[nodes.index]
nodes["memory"] = (
nodes["memory"]
.applymap(int)
.applymap(str)
.apply(lambda x: numfmt("\n".join(x), to="iec-i", format="%.1f", field="-").split())
)
# exclude namespaces that have 'system' in them
if os.environ.get("HIDESYS"):
join = join[
~join.index.to_frame(index=False)
.namespace.str.contains("system|prometheus", regex=True)
.fillna(False)
.values
]
print(nodes)
print(join.fillna(""))
# all containers that do not have mem limits or mem limit != mem request
# join[join.memory.limits.isna() | (join.memory.limits != join.memory.requests)]['memory'].reset_index().fillna('').drop(columns='node').sort_values(['namespace', 'pod', 'container'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment