Last active
November 30, 2023 13:13
-
-
Save juliantaylor/996e0255809cb2077b66dc034ec47f55 to your computer and use it in GitHub Desktop.
k8s iptable verify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
mkdir -p /tmp/data | |
while [[ $# -gt 0 ]]; do | |
node=$1 | |
shift | |
proxy=$(kubectl -n kube-system get pods --no-headers -l app.kubernetes.io/name=kube-proxy --field-selector=spec.nodeName=$node | awk '{print $1}') | |
kubectl -n kube-system exec -c kube-proxy -t "$proxy" -- iptables-save > /tmp/data/$node.iptables | |
./check-iptables.py /tmp/data/$node.iptables $node | tee /tmp/data/$node.log | grep -B 1 ERROR && echo kubectl -n kube-system delete pod $proxy | |
done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from collections import defaultdict | |
import sys | |
from functools import lru_cache | |
import dateutil.parser | |
from datetime import datetime, timezone | |
core = None | |
discover = None | |
def get_kube_api(): | |
global core, discover | |
if not core and not discover: | |
import kubernetes as k8s | |
k8s.config.load_kube_config() | |
core, discover = k8s.client.CoreV1Api(), k8s.client.DiscoveryV1Api() | |
return core, discover | |
@lru_cache(maxsize=1000) | |
def _get_eps(ns): | |
core, discover = get_kube_api() | |
eps = discover.list_namespaced_endpoint_slice(namespace=ns) | |
d = defaultdict(list) | |
for epobj in eps.items: | |
d[epobj.metadata.labels["kubernetes.io/service-name"]].append(epobj) | |
return d | |
def get_endpoints(): | |
core, discover = get_kube_api() | |
services = set() | |
for svc in core.list_service_for_all_namespaces().items: | |
try: | |
ip = svc.status.load_balancer.ingress[0].ip | |
port = svc.spec.ports[0].port | |
services.add((svc.metadata.namespace, svc.metadata.name, ip, port, | |
svc.spec.external_traffic_policy == 'Local')) | |
except Exception: | |
ip = svc.spec.cluster_ip | |
port = svc.spec.ports[0].port | |
if not ip or ip == 'None': | |
continue | |
# print(svc.metadata.name, svc.spec.external_traffic_policy, ip) | |
services.add((svc.metadata.namespace, svc.metadata.name, ip, port, | |
False)) | |
res = defaultdict(list) | |
for ns, name, ip, port, local in services: | |
eps = _get_eps(ns) | |
for epobj in eps[name]: | |
# some endpoints dont get this annotation | |
# only include endpoints with it to reduce false positives | |
diff = -1 | |
if epobj.metadata.annotations: | |
an = epobj.metadata.annotations | |
change_t = an.get("endpoints.kubernetes.io/last-change-trigger-time") # noqa | |
diff = (datetime.now(timezone.utc) - dateutil.parser.parse(change_t)).total_seconds() # noqa | |
if diff < 180: | |
print("recently changed endpoint", name, ip, diff) | |
if (ip, port, local) in res: | |
del res[(ip, port, local)] | |
break | |
print(ip, port, diff) | |
for ep in epobj.endpoints: | |
if ep.conditions.serving: | |
res[(ip, port, local)].append(ep) # node_name | |
return res | |
class Rule: | |
def __init__(self, raw): | |
raw = raw.strip() | |
self.split = raw.split() | |
self.chain = self.split[1] | |
self.rule = " ".join(self.split[2:]) | |
self.target = None | |
if "-j" in self.split: | |
self.target = self.split[self.split.index("-j") + 1] | |
def __contains__(self, v): | |
return v in self.rule | |
def __str__(self): | |
return f"{self.chain} {self.rule}" | |
def iter_rules(chains): | |
for chain in chains.values(): | |
yield from chain | |
def get_ip_entry(chains, ip, port=None): | |
for rule in iter_rules(chains): | |
b_ip = f"{ip}/32" in rule | |
if port: | |
b_port = f"--dport {port}" in rule | |
else: | |
b_port = True | |
if b_ip and b_port: | |
return rule | |
return None | |
def follow_chain(chains, start, v=None): | |
targets = set() | |
for rule in chains[start]: | |
if v: | |
print(rule) | |
yield rule | |
if rule.target and rule.target not in ("KUBE-MARK-MASQ", "REJECT"): | |
targets.add(rule.target) | |
for target in targets: | |
yield from follow_chain(chains, target, v) | |
def get_chains(table): | |
chains = defaultdict(list) | |
for line in table: | |
if not line.startswith("-A"): | |
continue | |
rule = Rule(line) | |
chains[rule.chain].append(rule) | |
return chains | |
def get_iptable_endpoints(chains, ip, port, v=None): | |
rule = get_ip_entry(chains, ip, port) | |
lep = list() | |
cep = list() | |
if rule is None: | |
print(f"No rule found for {ip} {port}") | |
return lep, cep | |
for rule in follow_chain(chains, rule.target, v): | |
if rule.target == "KUBE-MARK-MASQ": | |
continue | |
# skip affinity rules | |
if '-m recent' in rule.rule: | |
continue | |
if rule.chain.startswith("KUBE-SVL"): | |
lep.append(rule.rule) | |
if rule.chain.startswith("KUBE-SVC"): | |
cep.append(rule.rule) | |
return lep, cep | |
def check(): | |
data = get_endpoints() | |
table = open(sys.argv[1]).readlines() | |
node = sys.argv[2] | |
chains = get_chains(table) | |
for k, endpoints in data.items(): | |
ip, port, local = k | |
lep, cep = get_iptable_endpoints(chains, ip, port) | |
# no rule found, probably new service since iptable dump | |
if not lep and not cep: | |
continue | |
lapiep = list() | |
for e in endpoints: | |
if e.node_name == node: | |
lapiep.append(e) | |
if not local and len(cep) != len(endpoints): | |
print(ip, port, "cep iptables", len(cep), "!= api", len(endpoints)) | |
print("Cluster service ERROR") | |
if local and len(lep) != len(lapiep): | |
print(ip, port, "lep iptables", len(lep), "!= api", len(lapiep)) | |
print("Local service ERROR") | |
def main(): | |
if len(sys.argv) == 3: | |
check() | |
elif len(sys.argv) == 4: | |
table = open(sys.argv[1]).readlines() | |
ip = sys.argv[2] | |
port = sys.argv[3] | |
chains = get_chains(table) | |
rule = get_ip_entry(chains, ip, port) | |
print(rule) | |
lep, cep = get_iptable_endpoints(chains, ip, port, v=True) | |
for ep in lep: | |
print(ep) | |
print() | |
for ep in cep: | |
print(ep) | |
print("local:", len(lep)) | |
print("cluster", len(cep)) | |
else: | |
print("<iptable-save-output> <nodename>") | |
print("Verify iptables is consistent with ready endpoints") | |
print("") | |
print("<iptable-save-output> <ip> <port>") | |
print("dump iptables configured chain for ip/port") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment