This callback can be given to scapy
's sessions()
to group traffic full-duplex. It's a very general solution that can handle all Ethernet packets and makes grouping keys from which information can be easily extracted (see example below).
# Callback to make sessions() group traffic full-duplex, rather than
# half-dupelex, as would be the default. Basically returns a grouping
# key of the following format (between and excluding the ticks):
# `<L_2>|<L_3>|<L_4> <MAC_1>~<IP_1>~<PORT_1> <MAC_2>~<IP_2>~<PORT_2>`
# Where <L_*> are the protocols on the respective layer, specified by
# their corresponding scapy class name, layers not in packet omitted.
# <MAC_*>, <IP_*> and <PORT_*> are MAC, IP and port, respectively.
# Any value in arrow brackets will be hash symbol (#) if not available.
# If IPs are available, MACs will not be set, to avoid that IP sessions
# are split by hops (Ethernet addresses are different for each hop).
# The two mac/IP/port groups are sorted alphabetically (within a group,
# IP is always first and port second, but either the group for one or
# the other host may come first). This ensuring full-duplex grouping
# and ease in extracting the values later.
# See:
# https://www.sans.org/blog/scapy-full-duplex-stream-reassembly/
# https://github.com/secdev/scapy/blob/master/scapy/plist.py#L621
def _session_extractor_fullduplex(p):
if 'Ether' in p:
# Get list of all protocols in packet (as short names)
protos = [p[l].__class__.__name__ for l in p.layers()]
protos = protos[:3] # only condider OSI layers 2 to 4
if "Raw" in protos: protos.remove("Raw")
# Make src and dst strings
l3 = protos[1] if len(protos) >= 2 else None
l4 = protos[2] if len(protos) >= 3 else None
src_str = "{}~{}~{}".format(
p[Ether].src if not "IP" in protos else "#",
p[l3].src if l3 is not None and hasattr(p[l3], "src") else "#",
p[l4].sport if l4 is not None and hasattr(p[l4], "sport") else "#"
)
dst_str = "{}~{}~{}".format(
p[Ether].dst if not "IP" in protos else "#",
p[l3].dst if l3 is not None and hasattr(p[l3], "dst") else "#",
p[l4].dport if l4 is not None and hasattr(p[l4], "dport") else "#"
)
# Make and return the key
return "{} {} {}".format(
"|".join(protos),
*sorted([src_str, dst_str])
)
else:
return "OTHER"
The following example analyzes a pcap
file, groups all traffic and extracts information from the grouping key using a regex.
import ipaddress
import re
FILE_LOCATION="/tmp/some_pcap_file.pcap"
packet_list = scapy.utils.rdpcap(FILE_LOCATION)
sessions = packet_list.sessions(_session_extractor_fullduplex)
for summary_str, packets in sessions.items():
# Extract information from summary_str
r = re.search(r"([^\s]+) ([^\s]+)~([^\s]+)~([^\s]+) ([^\s]+)~([^\s]+)~([^\s]+)", summary_str)
if r is None:
print("something's fishy")
else:
protos, mac1, ip1, port1, mac2, ip2, port2 = r.groups()
protos = protos.split("|")
mac1 = None if mac1 == "#" else mac1
mac2 = None if mac2 == "#" else mac2
ip1 = None if ip1 == "#" else ip1
ip2 = None if ip2 == "#" else ip2
port1 = None if port1 == "#" else port1
port2 = None if port2 == "#" else port2
print("{} {}:{} <-> {}:{} (interesting: {}): {} packets".format( \
"|".join(protos),
mac1 if ip1 is None else ip1,
port1,
mac2 if ip2 is None else ip2,
port2,
len(packets)))