Skip to content

Instantly share code, notes, and snippets.

@juliantaylor
Last active November 30, 2023 13:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juliantaylor/996e0255809cb2077b66dc034ec47f55 to your computer and use it in GitHub Desktop.
Save juliantaylor/996e0255809cb2077b66dc034ec47f55 to your computer and use it in GitHub Desktop.
k8s iptable verify
#!/bin/bash
mkdir -p /tmp/data
while [[ $# -gt 0 ]]; do
node=$1
shift
proxy=$(kubectl -n kube-system get pods --no-headers -l app.kubernetes.io/name=kube-proxy --field-selector=spec.nodeName=$node | awk '{print $1}')
kubectl -n kube-system exec -c kube-proxy -t "$proxy" -- iptables-save > /tmp/data/$node.iptables
./check-iptables.py /tmp/data/$node.iptables $node | tee /tmp/data/$node.log | grep -B 1 ERROR && echo kubectl -n kube-system delete pod $proxy
done
#!/usr/bin/python3
from collections import defaultdict
import sys
from functools import lru_cache
import dateutil.parser
from datetime import datetime, timezone
core = None
discover = None
def get_kube_api():
global core, discover
if not core and not discover:
import kubernetes as k8s
k8s.config.load_kube_config()
core, discover = k8s.client.CoreV1Api(), k8s.client.DiscoveryV1Api()
return core, discover
@lru_cache(maxsize=1000)
def _get_eps(ns):
core, discover = get_kube_api()
eps = discover.list_namespaced_endpoint_slice(namespace=ns)
d = defaultdict(list)
for epobj in eps.items:
d[epobj.metadata.labels["kubernetes.io/service-name"]].append(epobj)
return d
def get_endpoints():
core, discover = get_kube_api()
services = set()
for svc in core.list_service_for_all_namespaces().items:
try:
ip = svc.status.load_balancer.ingress[0].ip
port = svc.spec.ports[0].port
services.add((svc.metadata.namespace, svc.metadata.name, ip, port,
svc.spec.external_traffic_policy == 'Local'))
except Exception:
ip = svc.spec.cluster_ip
port = svc.spec.ports[0].port
if not ip or ip == 'None':
continue
# print(svc.metadata.name, svc.spec.external_traffic_policy, ip)
services.add((svc.metadata.namespace, svc.metadata.name, ip, port,
False))
res = defaultdict(list)
for ns, name, ip, port, local in services:
eps = _get_eps(ns)
for epobj in eps[name]:
# some endpoints dont get this annotation
# only include endpoints with it to reduce false positives
diff = -1
if epobj.metadata.annotations:
an = epobj.metadata.annotations
change_t = an.get("endpoints.kubernetes.io/last-change-trigger-time") # noqa
diff = (datetime.now(timezone.utc) - dateutil.parser.parse(change_t)).total_seconds() # noqa
if diff < 180:
print("recently changed endpoint", name, ip, diff)
if (ip, port, local) in res:
del res[(ip, port, local)]
break
print(ip, port, diff)
for ep in epobj.endpoints:
if ep.conditions.serving:
res[(ip, port, local)].append(ep) # node_name
return res
class Rule:
def __init__(self, raw):
raw = raw.strip()
self.split = raw.split()
self.chain = self.split[1]
self.rule = " ".join(self.split[2:])
self.target = None
if "-j" in self.split:
self.target = self.split[self.split.index("-j") + 1]
def __contains__(self, v):
return v in self.rule
def __str__(self):
return f"{self.chain} {self.rule}"
def iter_rules(chains):
for chain in chains.values():
yield from chain
def get_ip_entry(chains, ip, port=None):
for rule in iter_rules(chains):
b_ip = f"{ip}/32" in rule
if port:
b_port = f"--dport {port}" in rule
else:
b_port = True
if b_ip and b_port:
return rule
return None
def follow_chain(chains, start, v=None):
targets = set()
for rule in chains[start]:
if v:
print(rule)
yield rule
if rule.target and rule.target not in ("KUBE-MARK-MASQ", "REJECT"):
targets.add(rule.target)
for target in targets:
yield from follow_chain(chains, target, v)
def get_chains(table):
chains = defaultdict(list)
for line in table:
if not line.startswith("-A"):
continue
rule = Rule(line)
chains[rule.chain].append(rule)
return chains
def get_iptable_endpoints(chains, ip, port, v=None):
rule = get_ip_entry(chains, ip, port)
lep = list()
cep = list()
if rule is None:
print(f"No rule found for {ip} {port}")
return lep, cep
for rule in follow_chain(chains, rule.target, v):
if rule.target == "KUBE-MARK-MASQ":
continue
# skip affinity rules
if '-m recent' in rule.rule:
continue
if rule.chain.startswith("KUBE-SVL"):
lep.append(rule.rule)
if rule.chain.startswith("KUBE-SVC"):
cep.append(rule.rule)
return lep, cep
def check():
data = get_endpoints()
table = open(sys.argv[1]).readlines()
node = sys.argv[2]
chains = get_chains(table)
for k, endpoints in data.items():
ip, port, local = k
lep, cep = get_iptable_endpoints(chains, ip, port)
# no rule found, probably new service since iptable dump
if not lep and not cep:
continue
lapiep = list()
for e in endpoints:
if e.node_name == node:
lapiep.append(e)
if not local and len(cep) != len(endpoints):
print(ip, port, "cep iptables", len(cep), "!= api", len(endpoints))
print("Cluster service ERROR")
if local and len(lep) != len(lapiep):
print(ip, port, "lep iptables", len(lep), "!= api", len(lapiep))
print("Local service ERROR")
def main():
if len(sys.argv) == 3:
check()
elif len(sys.argv) == 4:
table = open(sys.argv[1]).readlines()
ip = sys.argv[2]
port = sys.argv[3]
chains = get_chains(table)
rule = get_ip_entry(chains, ip, port)
print(rule)
lep, cep = get_iptable_endpoints(chains, ip, port, v=True)
for ep in lep:
print(ep)
print()
for ep in cep:
print(ep)
print("local:", len(lep))
print("cluster", len(cep))
else:
print("<iptable-save-output> <nodename>")
print("Verify iptables is consistent with ready endpoints")
print("")
print("<iptable-save-output> <ip> <port>")
print("dump iptables configured chain for ip/port")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment