Created
December 5, 2022 15:11
-
-
Save ixs/9d8f17efb0fdd6222578be3bef4d1350 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
# IPTables Trace parser | |
# Licensed under GPLv3+ by Andreas Thienemann <andreas@bawue.net> | |
# | |
# Takes a tracelog and a connection ID and will display the actual firewall rules | |
# a packet hits on it's way through the different iptables chains. | |
# | |
# Instructions: | |
# 1. Enable rule to mark packets to trace in the raw table: | |
# iptables -t raw -A PREROUTING -p tcp --destination 10.1.1.2 --source 10.1.1.1 --dport 80 -j TRACE | |
# 2. Get logs from kernel ringbuffer: | |
# dmesg | grep TRACE: > /tmp/ipt_trace.log | |
# 3. Generate readable rule list for specific connection ID | |
# ./trace_filter.py trace 42007 | |
import ipaddress | |
import sys | |
import subprocess | |
import pprint | |
filename = sys.argv[1] | |
try: | |
id_spec = sys.argv[2] | |
except IndexError: | |
id_spec = None | |
def get_iptables(binary="iptables"): | |
ipt_data = subprocess.run(f"{binary}-save", stdout=subprocess.PIPE) | |
ipt = {} | |
capture = False | |
table = None | |
chain = None | |
for line in ipt_data.stdout.decode().strip().split("\n"): | |
if line.startswith("#"): | |
continue | |
elif line.startswith("*"): | |
table = line[1:] | |
ipt[table] = {} | |
capture = True | |
elif line == "COMMIT": | |
capture = False | |
elif line.startswith(":"): | |
chain = line.split()[0][1:] | |
ipt[table][chain] = {"rules": []} | |
default = line.split()[1] | |
if default != "-": | |
ipt[table][chain]["default"] = default | |
elif capture: | |
chain = line.split()[1] | |
ipt[table][chain]["rules"].append(line) | |
else: | |
raise(f"Unexpected iptables-save output: {line}") | |
return ipt | |
data = [] | |
conn = {} | |
with open(filename) as f: | |
next = None | |
for token in f.readline().strip().split(): | |
if "=" in token: | |
key, val = token.split("=") | |
conn.update({key: val}) | |
if token in ("DF", "SYN",): | |
conn.update({token: "1"}) | |
if token in ("OPT"): | |
next = "OPT" | |
continue | |
if next is not None: | |
conn.update({next: token}) | |
next = None | |
f.seek(0) | |
for line in f.readlines(): | |
if id_spec and " SEQ={} ".format(id_spec) not in line: | |
continue | |
rule_spec = line.split("]", 1)[1].split()[1] | |
data.append(dict(zip(("table", "chain", "type", "rulenum"), rule_spec.split(":")))) | |
if ipaddress.ip_address(conn['SRC']).version == 6: | |
binary = "ip6tables" | |
else: | |
binary = "iptables" | |
iptables = get_iptables(binary) | |
parsed_commands = [] | |
for entry in data: | |
try: | |
if entry["type"] == "rule": | |
parsed_commands.append(f"{binary} -t {entry['table']} " + iptables[entry["table"]][entry["chain"]]["rules"][int(entry["rulenum"]) - 1]) | |
elif entry["type"] == "return": | |
parsed_commands.append(f"{binary} -t {entry['table']} -A {entry['chain']} -j RETURN") | |
elif entry["type"] == "policy": | |
parsed_commands.append(f"{binary} -t {entry['table']} -A {entry['chain']} -j {iptables[entry['table']][entry['chain']]['default']}") | |
else: | |
parsed_commands.append(entry) | |
except IndexError: | |
parsed_commands.append("# Rule not found: {entry}") | |
if binary == "iptables": | |
print(f"IPTables Packet Trace for {conn['SRC']}:{conn['SPT']} -> {conn['DST']}:{conn['DPT']}") | |
elif binary == "ip6tables": | |
print(f"IPTables Packet Trace for [{conn['SRC']}]:{conn['SPT']} -> [{conn['DST']}]:{conn['DPT']}") | |
print() | |
print("\n".join(parsed_commands)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment