Skip to content

Instantly share code, notes, and snippets.

@zeocs
Created July 28, 2023 20:51
Show Gist options
  • Save zeocs/fa8171829258e68f739225f70ff0163b to your computer and use it in GitHub Desktop.
Save zeocs/fa8171829258e68f739225f70ff0163b to your computer and use it in GitHub Desktop.
scapy full-duplex grouping of captured traffic with sessions()

This callback can be given to scapy's sessions() to group traffic full-duplex. It's a very general solution that can handle all Ethernet packets and makes grouping keys from which information can be easily extracted (see example below).

# Callback to make sessions() group traffic full-duplex, rather than
# half-dupelex, as would be the default. Basically returns a grouping
# key of the following format (between and excluding the ticks):
# `<L_2>|<L_3>|<L_4> <MAC_1>~<IP_1>~<PORT_1> <MAC_2>~<IP_2>~<PORT_2>`
# Where <L_*> are the protocols on the respective layer, specified by
# their corresponding scapy class name, layers not in packet omitted.
# <MAC_*>, <IP_*> and <PORT_*> are MAC, IP and port, respectively.
# Any value in arrow brackets will be hash symbol (#) if not available.
# If IPs are available, MACs will not be set, to avoid that IP sessions
# are split by hops (Ethernet addresses are different for each hop).
# The two mac/IP/port groups are sorted alphabetically (within a group,
# IP is always first and port second, but either the group for one or
# the other host may come first). This ensuring full-duplex grouping
# and ease in extracting the values later.
# See:
# https://www.sans.org/blog/scapy-full-duplex-stream-reassembly/
# https://github.com/secdev/scapy/blob/master/scapy/plist.py#L621
def _session_extractor_fullduplex(p):
    if 'Ether' in p:
        # Get list of all protocols in packet (as short names)
        protos = [p[l].__class__.__name__ for l in p.layers()]
        protos = protos[:3] # only condider OSI layers 2 to 4
        if "Raw" in protos: protos.remove("Raw")
        # Make src and dst strings
        l3 = protos[1] if len(protos) >= 2 else None
        l4 = protos[2] if len(protos) >= 3 else None
        src_str = "{}~{}~{}".format(
            p[Ether].src if not "IP" in protos else "#",
            p[l3].src if l3 is not None and hasattr(p[l3], "src") else "#",
            p[l4].sport if l4 is not None and hasattr(p[l4], "sport") else "#"
        )
        dst_str = "{}~{}~{}".format(
            p[Ether].dst if not "IP" in protos else "#",
            p[l3].dst if l3 is not None and hasattr(p[l3], "dst") else "#",
            p[l4].dport if l4 is not None and hasattr(p[l4], "dport") else "#"
        )
        # Make and return the key
        return "{} {} {}".format(
            "|".join(protos),
            *sorted([src_str, dst_str])
        )
    else:
        return "OTHER"

The following example analyzes a pcap file, groups all traffic and extracts information from the grouping key using a regex.

import ipaddress
import re

FILE_LOCATION="/tmp/some_pcap_file.pcap"

packet_list = scapy.utils.rdpcap(FILE_LOCATION)
sessions = packet_list.sessions(_session_extractor_fullduplex)
for summary_str, packets in sessions.items():
    # Extract information from summary_str
    r = re.search(r"([^\s]+) ([^\s]+)~([^\s]+)~([^\s]+) ([^\s]+)~([^\s]+)~([^\s]+)", summary_str)
    if r is None:
        print("something's fishy")
    else:
        protos, mac1, ip1, port1, mac2, ip2, port2 = r.groups()
        protos = protos.split("|")
        mac1 = None if mac1 == "#" else mac1
        mac2 = None if mac2 == "#" else mac2
        ip1 = None if ip1 == "#" else ip1
        ip2 = None if ip2 == "#" else ip2
        port1 = None if port1 == "#" else port1
        port2 = None if port2 == "#" else port2

        print("{} {}:{} <-> {}:{} (interesting: {}): {} packets".format( \
            "|".join(protos),
            mac1 if ip1 is None else ip1,
            port1,
            mac2 if ip2 is None else ip2,
            port2,
            len(packets)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment