Skip to content

Instantly share code, notes, and snippets.

@dvarrazzo
Last active February 1, 2020 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvarrazzo/8341d0641c8ef13d6d1d807c5fb2775c to your computer and use it in GitHub Desktop.
Save dvarrazzo/8341d0641c8ef13d6d1d807c5fb2775c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Render a view of the k8s status in a namespace.
Example:
# Display the state in console
k8splore.py -n my-ns -c my-cx --format text
# Display the state as a graph, with some nodes omitted.
k8splore.py -n my-ns -c my-cx --drop-kind ReplicaSet Pod Job \\
--format dot | dot -Tsvg > my-ns.svg && firefox my-ns.svg
"""
import re
import sys
import json
import logging
import subprocess as sp
from operator import attrgetter
from collections import defaultdict
logger = logging.getLogger()
logging.basicConfig(
level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s'
)
def main():
opt = parse_cmdline()
resources = get_all_resources(opt)
resources = drop_no_pod(resources)
resources = add_containers(resources)
resources = add_extnames(resources)
resources = create_graph(resources)
resources = add_groups(resources)
for kind in opt.drop_kind:
resources = drop_kind(resources, kind)
if opt.format == 'dot':
print_dot(resources)
elif opt.format == 'text':
print_text(resources)
else:
assert False, f"unknown format {opt.format}"
def get_all_resources(opt):
kinds = set(['Pod', 'Service'])
got = set()
resources = {}
while kinds:
kind = kinds.pop()
if kind in got:
continue
got.add(kind)
logger.info("reading resurces of kind %s", kind)
cmdline = ["kubectl", "-o", "json"]
if opt.context:
cmdline += ['--context', opt.context]
if opt.namespace:
cmdline += ['--namespace', opt.namespace]
cmdline += ["get", kind]
objs = sp.check_output(cmdline)
objs = json.loads(objs)['items']
for obj in objs:
obj = Resource(obj)
assert obj.id not in resources
resources[obj.id] = obj
for ref in obj.references:
if ref.kind:
kinds.add(ref.kind)
return resources
def drop_no_pod(resources):
rv = resources.copy()
# delete the replicasets without anything referring to them
refs = {ref.id for res in rv.values() for ref in res.references}
for res in list(rv.values()):
if res.kind == 'ReplicaSet' and res.id not in refs:
del rv[res.id]
# delete the deployments without anything referring to them
refs = {ref.id for res in rv.values() for ref in res.references}
for res in list(rv.values()):
if res.kind == 'Deployment' and res.id not in refs:
del rv[res.id]
return rv
def add_extnames(resources):
rv = resources.copy()
for res in resources.values():
if not res.kind == 'Service':
continue
ext = res.external_name
if ext is not None and ext.id not in rv:
rv[ext.id] = ext
return rv
def add_containers(resources):
rv = resources.copy()
for res in resources.values():
if not res.kind == 'Pod':
continue
for c in res.containers:
if c.id in rv:
continue
rv[c.id] = c
for p in c.ports:
assert p.id not in rv, p.id
rv[p.id] = p
return rv
def create_graph(resources):
rv = resources.copy()
by_label = defaultdict(set)
for res in resources.values():
if res.kind != 'Pod':
continue
for k, v in res.labels.items():
by_label[k, v].add(res.id)
for res in resources.values():
if res.kind == 'Service':
if res.type == 'ClusterIP':
pids = None
for sel in res.selector.items():
if pids is None:
pids = by_label.get(sel, set())
else:
pids &= by_label.get(sel, set())
if pids:
for pid in pids:
rv[pid].children.add(res.id)
else:
# No pod offers this service
del rv[res.id]
elif res.type == 'ExternalName':
ext = res.external_name
rv[ext.id].children.add(res.id)
for ref in res.references:
rv[ref.id].children.add(res.id)
if res.kind == 'Pod':
for c in res.containers:
if c.id not in resources:
continue
res.children.add(c.id)
for p in c.ports:
if p.id in resources:
c.children.add(p.id)
return rv
def add_groups(resources):
def set_recursive(res, group):
res.group = group
for c in res.children:
set_recursive(resources[c], group)
for res in resources.values():
if res.kind != 'Deployment':
continue
if res.name.startswith("api-"):
set_recursive(res, 'API')
elif res.name.startswith("web-"):
set_recursive(res, 'Web')
else:
set_recursive(res, 'Other')
for res in resources.values():
if not hasattr(res, 'group'):
res.group = 'Other'
return resources
def drop_kind(resources, kind):
kind = kind.lower()
parents = defaultdict(set)
for res in resources.values():
for cid in res.children:
parents[cid].add(res.id)
rv = resources.copy()
def del_recursive(id):
for c in rv[id].children:
del_recursive(c)
del rv[id]
for res in resources.values():
if res.kind.lower() != kind:
continue
for pid in parents[res.id]:
resources[pid].children.discard(res.id)
for cid in res.children:
# Don't duplicate containers with the same image
if rv[cid].kind == 'Container':
iname = rv[cid].image
assert len(parents[res.id]) == 1
parent = rv[list(parents[res.id])[0]]
for sibid in parent.children:
if (
rv[sibid].kind == 'Container'
and rv[sibid].image == iname
):
rv[sibid].count += 1
del_recursive(cid)
break
else:
parent.children.add(cid)
else:
for pid in parents[res.id]:
resources[pid].children.add(cid)
del rv[res.id]
return rv
def print_dot(resources):
groups = defaultdict(list)
for res in sorted(resources.values(), key=attrgetter('name')):
groups[res.group].append(res)
print("digraph g {")
print(' rankdir="LR";')
print(' node [fontsize=8 fontname=helvetica];')
for gname, group in sorted(groups.items(), reverse=True):
print(f'subgraph cluster_{gname} {{')
print(f'label="{gname}"')
for res in group:
if res.kind in ('Service', 'Port'):
continue
if res.kind == 'Container':
servs = res.ports
else:
servs = [
resources[cid]
for cid in res.children
if resources[cid].kind == 'Service'
]
if not servs:
node = res.node
else:
node = res.render_node(label=res.node_label, services=servs)
print(f' "{res.id}" {node};')
for cid in res.children:
if resources[cid].kind in ('Service', 'Port'):
continue
print(f' "{res.id}" -> "{cid}";')
print("}")
print("}")
def print_text(resources):
def key(res):
if res.kind in ('Deployment', 'StatefulSet', 'DaemonSet', 'CronJob'):
score = 1
elif res.kind in ('Container'):
score = 3
elif res.kind in ('Port'):
score = 4
else:
score = 2
return score, res.kind, res.name
groups = defaultdict(list)
for res in sorted(resources.values(), key=key):
groups[res.group].append(res)
printed = set()
def print_recursive(res, level=0):
if res.id in printed:
return
# if res.kind == 'Service':
# return
printed.add(res.id)
if res.count > 1:
count = f' (x{res.count})'
else:
count = ''
indent = ' ' * level
comment = ''
if res.kind == 'Deployment':
comment = res.comment or ''
comment = f' {comment}'
print(f"{indent}{res.kind} {res.name}{count}{comment}")
if res.kind == 'Service':
for port in res.ports:
print(f" {indent} {port.name}")
children = [resources[c] for c in res.children]
children.sort(key=key)
for c in children:
print_recursive(c, level=level + 1)
if level == 0:
print()
for gname, group in sorted(groups.items()):
print(f'group: {gname}\n')
for res in group:
print_recursive(res)
print()
class JsonThing:
def __init__(self, json):
self.json = json
self.children = set()
self.count = 1
@property
def id(self):
return f"{self.kind}:{self.name}"
@property
def kind(self):
return self.__class__.__name__
def __repr__(self):
return "<%s at 0x%X>" % (self.id, id(self))
def render_node(self, label, shape='box', services=None):
def render_port(p):
return p.name
def render_service(s):
rv = [s.name]
for port in s.ports:
rv.append(port.name)
return '\\n'.join(rv)
def dot_escstring(s):
# https://graphviz.org/doc/info/attrs.html#k:escString
# not really, escape only quotes for now
return re.sub(r'(["])', r'\\\1', s)
if services:
shape = 'record'
if services[0].kind == 'Service':
title = 'Services'
services.sort(key=attrgetter('name'))
services = ' | '.join(render_service(s) for s in services)
else:
assert services[0].kind == 'Port', services[0].kind
title = 'Ports'
services.sort(key=attrgetter('port', 'protocol'))
services = ' | '.join(render_port(p) for p in services)
label = f'{{ {label} | {{ {title} | {services} }} }}'
tooltip = self.comment or ''
if tooltip:
tooltip = dot_escstring(tooltip)
tooltip = f' tooltip="{tooltip}"'
return f'[shape={shape} label="{label}"{tooltip}]'
@property
def node_shape(self):
kind = self.kind
if kind in ('ExternalName', 'Deployment'):
return 'component'
elif self.kind == 'Service':
return 'oval'
elif self.kind == 'Container':
return 'box3d'
else:
return 'box'
@property
def node_label(self):
deets = ''
if self.count > 1:
count = f' (x{self.count})'
else:
count = ''
if self.kind == 'Service':
deets = ', '.join(
f"{port.protocol}/{port.port}" for port in self.ports
)
deets = f' ({deets})'
if self.kind == 'Pod':
node = self.json["spec"]["nodeName"].split('.', 1)[0]
return f'{self.kind}{count}\\n{node}{deets}'
elif self.kind in ('ReplicaSet', 'Job'):
return f'{self.kind}{count}'
else:
return f'{self.kind}{count}\\n{self.name}{deets}'
@property
def node(self):
return self.render_node(shape=self.node_shape, label=self.node_label)
@property
def references(self):
return []
@property
def ports(self):
if self.kind == 'Service':
if not hasattr(self, '_ports'):
self._ports = [
Port(p, self) for p in self.json['spec'].get('ports', ())
]
return self._ports
else:
raise AttributeError(f"you don't find ports on a {self.kind}")
@property
def comment(self):
try:
comment = self.json['metadata']['annotations'][
'kubeonoff/description'
]
except (KeyError, TypeError, AttributeError):
return
else:
return comment.splitlines()[0]
class Resource(JsonThing):
@property
def kind(self):
return self.json['kind']
@property
def type(self):
return self.json['spec'].get('type')
@property
def name(self):
return self.json['metadata']['name']
@property
def labels(self):
return self.json['metadata']['labels']
@property
def selector(self):
return self.json['spec'].get('selector', {})
@property
def references(self):
refs = self.json['metadata'].get('ownerReferences', ())
return [Reference(ref) for ref in refs]
@property
def containers(self):
if not hasattr(self, '_containers'):
self._containers = [
Container(c, self)
for c in self.json['spec'].get('containers', ())
]
return self._containers
@property
def external_name(self):
if (
self.kind == 'Service'
and self.json['spec'].get('type') == 'ExternalName'
):
return ExternalName(self.json['spec']['externalName'])
class Reference(JsonThing):
@property
def kind(self):
return self.json['kind']
@property
def name(self):
return self.json['name']
class Container(JsonThing):
def __init__(self, json, parent):
super().__init__(json)
self.parent = parent
@property
def id(self):
return f"{self.kind}:{self.parent.name}:{self.name}"
@property
def name(self):
return self.json['name']
@property
def image(self):
return self.json['image']
@property
def node_label(self):
image = self.image.rsplit(':', 1)[0]
if self.count > 1:
count = f' (x{self.count})'
else:
count = ''
return f"{self.kind}: {self.name}{count}\\n{image}"
@property
def ports(self):
if not hasattr(self, '_ports'):
self._ports = [Port(p, self) for p in self.json.get('ports', ())]
return self._ports
class Port(JsonThing):
def __init__(self, json, parent):
super().__init__(json)
self.parent = parent
@property
def id(self):
return (
f"{self.kind}:{self.parent.id}:"
+ "{protocol}/{containerPort}".format(**self.json)
)
@property
def name(self):
name = self.json.get('name')
protocol = self.protocol
number = self.port
if name is not None:
rv = f'{name}: {protocol}/{number}'
else:
rv = f'{protocol}/{number}'
hp = self.json.get('hostPort')
if hp is not None:
if hp != number:
rv = f'{rv} (host: {hp})'
else:
rv = f'{rv} (host)'
return rv
@property
def protocol(self):
return self.json['protocol']
@property
def port(self):
if 'containerPort' in self.json:
# on a container
return self.json['containerPort']
else:
# on a service
return self.json['port']
class ExternalName(JsonThing):
@property
def id(self):
return f"{self.kind}:{self.json}"
@property
def name(self):
return self.json
@property
def node_label(self):
return f"{self.kind}\\n{self.json}"
def parse_cmdline():
from argparse import ArgumentParser, RawDescriptionHelpFormatter
parser = ArgumentParser(
description=__doc__, formatter_class=RawDescriptionHelpFormatter
)
parser.add_argument(
'--context', '-c', help="kubernetes context to explore"
)
parser.add_argument(
'--namespace', '-n', help="kubernetes namespace to explore"
)
parser.add_argument(
'--drop-kind',
nargs='*',
metavar='KIND',
help="remove KIND nodes from the graph",
default=[],
)
parser.add_argument(
'--format',
choices=('dot', 'text'),
default='dot',
help="output format",
)
opt = parser.parse_args()
return opt
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment