Skip to content

Instantly share code, notes, and snippets.

@LensHunnel
Last active November 10, 2021 12:46
Show Gist options
  • Save LensHunnel/3fe393f6f26a09669b20d5bbba334a05 to your computer and use it in GitHub Desktop.
Save LensHunnel/3fe393f6f26a09669b20d5bbba334a05 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
from scapy.utils import rdpcap
from scapy.packet import Packet, bind_layers
from scapy import fields
from scapy.layers.inet import IP
from scapy.all import *
import pcap
class PfSyncHeader(Packet):
name = 'PFSyncHeaderPacket'
fields_desc = [fields.ByteField("version", None), fields.PadField(fields.ByteField("_pad", None), 1),
fields.LenField("len", None), fields.XBitField("pfcksum", None, 128)]
class PfSyncStateInsert(Packet):
name = 'PFSyncStateInsertPacket'
fields_desc = [fields.XLongField("id", None), fields.StrFixedLenField("ifname", None, 16),
fields.IPField("key.src", None), fields.XBitField("key.padding", None, 96),
fields.IPField("key.dst", None), fields.XBitField("key.padding", None, 96),
fields.BitField("key.srcport", None, 16), fields.BitField("key.dstport", None, 16),
fields.IPField("src", None), fields.XBitField("key.padding", None, 96), fields.IPField("dst", None),
fields.XBitField("key.padding", None, 96), fields.BitField("srcport", None, 16),
fields.BitField("dstport", None, 16)
, fields.XBitField("rt_addr", None, 640), fields.IntField("rule", None), fields.IntField("anchor", None),
fields.XBitField("nat_rule", None, 32), fields.IntField("creation", None),
fields.IntField("expire", None), fields.XBitField("packets", None, 128),
fields.XBitField("bytes", None, 128), fields.XIntField("creatorid", None),
fields.ByteField("af", None), fields.ByteField("proto", None), fields.BitField("sens", None, 8),
fields.BitField("__spare", None, 16), fields.ByteField("log", None),
fields.ByteField("state_flags", None), fields.ByteField("timeout", None),
fields.ByteField("sync_flags", None), fields.ByteField("updates", None),
]
class PfSyncStateUpdate(Packet):
name = 'PfSyncStateUpdatePacket'
fields_desc = [fields.XBitField("data", None, 88 * 8)]
SS = defaultdict(default=12)
SS[4] = 88
SS[1] = 242
SS[5] = 12
class PfSyncSubheader(Packet):
name = 'PFSyncSubHeaderPacket'
fields_desc = [fields.ByteField("action", None), fields.ByteField("_pad", None)
, fields.FieldLenField("count", 4, count_of="state")
, fields.XStrLenField("state", None, length_from=lambda pkt: pkt.count * SS[pkt.action])
]
bind_layers(IP, PfSyncHeader, proto=240)
bind_layers(PfSyncHeader, PfSyncSubheader)
# bind_layers(PfSyncSubheader, PfSyncStateInsert, action=1)
# bind_layers(PfSyncSubheader, PfSyncStateUpdate, action=4)
bind_layers(PfSyncSubheader, PfSyncSubheader)
def processing(p, ts):
ph = PfSyncHeader(p)
subph = ph[PfSyncSubheader] if ph.haslayer(PfSyncSubheader) else None
while subph:
if subph.action == 1:
try:
chunks, chunk_size = len(subph.state), int(len(subph.state) / subph.count)
except ZeroDivisionError:
subph = subph.getlayer(PfSyncSubheader, 2)
continue
states_raw = [PfSyncStateInsert(raw(subph.state[i:i + chunk_size])) for i in range(0, chunks, chunk_size)]
for sa in states_raw:
if sa.nat_rule != 0xffffffff:
print("{} proto {}:{} {} {}:{} ({}:{})".format(datetime.fromtimestamp(ts), sa.src,
sa.srcport, '<-' if sa.sens == 2 else '->', sa.dst,
sa.dstport, getattr(sa, 'key.dst'), getattr(sa, 'key.dstport')))
subph = subph.getlayer(PfSyncSubheader, 2)
if __name__ == '__main__':
sniffer = pcap.pcap(name="pfsync0", promisc=True, immediate=True, timeout_ms=50)
try:
for ts, pkt in sniffer:
processing(pkt, ts)
except KeyboardInterrupt:
print(('%d packets received, %d packets dropped %d packets dropped by interface') % sniffer.stats())
@LensHunnel
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment