Skip to content

Instantly share code, notes, and snippets.

@ixs
Created December 5, 2022 15:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ixs/9d8f17efb0fdd6222578be3bef4d1350 to your computer and use it in GitHub Desktop.
Save ixs/9d8f17efb0fdd6222578be3bef4d1350 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
#
# IPTables Trace parser
# Licensed under GPLv3+ by Andreas Thienemann <andreas@bawue.net>
#
# Takes a tracelog and a connection ID and will display the actual firewall rules
# a packet hits on it's way through the different iptables chains.
#
# Instructions:
# 1. Enable rule to mark packets to trace in the raw table:
# iptables -t raw -A PREROUTING -p tcp --destination 10.1.1.2 --source 10.1.1.1 --dport 80 -j TRACE
# 2. Get logs from kernel ringbuffer:
# dmesg | grep TRACE: > /tmp/ipt_trace.log
# 3. Generate readable rule list for specific connection ID
# ./trace_filter.py trace 42007
import ipaddress
import sys
import subprocess
import pprint
filename = sys.argv[1]
try:
id_spec = sys.argv[2]
except IndexError:
id_spec = None
def get_iptables(binary="iptables"):
ipt_data = subprocess.run(f"{binary}-save", stdout=subprocess.PIPE)
ipt = {}
capture = False
table = None
chain = None
for line in ipt_data.stdout.decode().strip().split("\n"):
if line.startswith("#"):
continue
elif line.startswith("*"):
table = line[1:]
ipt[table] = {}
capture = True
elif line == "COMMIT":
capture = False
elif line.startswith(":"):
chain = line.split()[0][1:]
ipt[table][chain] = {"rules": []}
default = line.split()[1]
if default != "-":
ipt[table][chain]["default"] = default
elif capture:
chain = line.split()[1]
ipt[table][chain]["rules"].append(line)
else:
raise(f"Unexpected iptables-save output: {line}")
return ipt
data = []
conn = {}
with open(filename) as f:
next = None
for token in f.readline().strip().split():
if "=" in token:
key, val = token.split("=")
conn.update({key: val})
if token in ("DF", "SYN",):
conn.update({token: "1"})
if token in ("OPT"):
next = "OPT"
continue
if next is not None:
conn.update({next: token})
next = None
f.seek(0)
for line in f.readlines():
if id_spec and " SEQ={} ".format(id_spec) not in line:
continue
rule_spec = line.split("]", 1)[1].split()[1]
data.append(dict(zip(("table", "chain", "type", "rulenum"), rule_spec.split(":"))))
if ipaddress.ip_address(conn['SRC']).version == 6:
binary = "ip6tables"
else:
binary = "iptables"
iptables = get_iptables(binary)
parsed_commands = []
for entry in data:
try:
if entry["type"] == "rule":
parsed_commands.append(f"{binary} -t {entry['table']} " + iptables[entry["table"]][entry["chain"]]["rules"][int(entry["rulenum"]) - 1])
elif entry["type"] == "return":
parsed_commands.append(f"{binary} -t {entry['table']} -A {entry['chain']} -j RETURN")
elif entry["type"] == "policy":
parsed_commands.append(f"{binary} -t {entry['table']} -A {entry['chain']} -j {iptables[entry['table']][entry['chain']]['default']}")
else:
parsed_commands.append(entry)
except IndexError:
parsed_commands.append("# Rule not found: {entry}")
if binary == "iptables":
print(f"IPTables Packet Trace for {conn['SRC']}:{conn['SPT']} -> {conn['DST']}:{conn['DPT']}")
elif binary == "ip6tables":
print(f"IPTables Packet Trace for [{conn['SRC']}]:{conn['SPT']} -> [{conn['DST']}]:{conn['DPT']}")
print()
print("\n".join(parsed_commands))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment