Skip to content

Instantly share code, notes, and snippets.

@MarkBaggett
Last active March 25, 2024 21:59
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save MarkBaggett/d8933453f431c111169158ce7f4e2222 to your computer and use it in GitHub Desktop.
Save MarkBaggett/d8933453f431c111169158ce7f4e2222 to your computer and use it in GitHub Desktop.
Python - SCAPY - Full Packet Session Reassembly
#From here https://pen-testing.sans.org/blog/2017/10/13/scapy-full-duplex-stream-reassembly
def full_duplex(p):
sess = "Other"
if 'Ether' in p:
if 'IP' in p:
if 'TCP' in p:
sess = str(sorted(["TCP", p[IP].src, p[TCP].sport, p[IP].dst, p[TCP].dport],key=str))
elif 'UDP' in p:
sess = str(sorted(["UDP", p[IP].src, p[UDP].sport, p[IP].dst, p[UDP].dport] ,key=str))
elif 'ICMP' in p:
sess = str(sorted(["ICMP", p[IP].src, p[IP].dst, p[ICMP].code, p[ICMP].type, p[ICMP].id] ,key=str))
else:
sess = str(sorted(["IP", p[IP].src, p[IP].dst, p[IP].proto] ,key=str))
elif 'ARP' in p:
sess = str(sorted(["ARP", p[ARP].psrc, p[ARP].pdst],key=str))
else:
sess = p.sprintf("Ethernet type=%04xr,Ether.type%")
return sess
@MarkBaggett
Copy link
Author

Example modified version of the Original session_extractor() function from scapy source:
https://github.com/secdev/scapy/blob/master/scapy/plist.py

@sundhaug92
Copy link

This incorrectly assumes IP has to be in an ethernet-frame, IP could also be in a Dot11 frame for example

@jvmk
Copy link

jvmk commented Jan 20, 2020

Thanks, Mark, for the code and blog post; very useful tip.

Small caveat to keep in mind: This is simplified session reassembly as it does not consider TCP FIN/RST packets. Packets are mapped to their respective session based solely on the (src_ip, src_port, dst_ip, dst_port) four-tuple. If the client (or server) closes a TCP stream and the client by chance selects the same ephemeral port number when contacting the same server again, the two different TCP streams will be identified as a single stream.

@WHOLETTHEDOG-OUT
Copy link

Is there any solution like add some other field?

@zeocs
Copy link

zeocs commented Jul 28, 2023

I wanted to share my improved version. It's more compact, more general (handles all Ethernet packages), puts more information into the key string, and makes it easy to extract that information from the key string later.

# Callback to make sessions() group traffic full-duplex, rather than
# half-dupelex, as would be the default. Basically returns a grouping
# key of the following format (between and excluding the ticks):
# `<L_2>|<L_3>|<L_4> <MAC_1>~<IP_1>~<PORT_1> <MAC_2>~<IP_2>~<PORT_2>`
# Where <L_*> are the protocols on the respective layer, specified by
# their corresponding scapy class name, layers not in packet omitted.
# <MAC_*>, <IP_*> and <PORT_*> are MAC, IP and port, respectively.
# Any value in arrow brackets will be hash symbol (#) if not available.
# If IPs are available, MACs will not be set, to avoid that IP sessions
# are split by hops (Ethernet addresses are different for each hop).
# The two mac/IP/port groups are sorted alphabetically (within a group,
# IP is always first and port second, but either the group for one or
# the other host may come first). This ensuring full-duplex grouping
# and ease in extracting the values later.
# See:
# https://www.sans.org/blog/scapy-full-duplex-stream-reassembly/
# https://github.com/secdev/scapy/blob/master/scapy/plist.py#L621
def _session_extractor_fullduplex(p):
    if 'Ether' in p:
        # Get list of all protocols in packet (as short names)
        protos = [p[l].__class__.__name__ for l in p.layers()]
        protos = protos[:3] # only condider OSI layers 2 to 4
        if "Raw" in protos: protos.remove("Raw")
        # Make src and dst strings
        l3 = protos[1] if len(protos) >= 2 else None
        l4 = protos[2] if len(protos) >= 3 else None
        src_str = "{}~{}~{}".format(
            p[Ether].src if not "IP" in protos else "#",
            p[l3].src if l3 is not None and hasattr(p[l3], "src") else "#",
            p[l4].sport if l4 is not None and hasattr(p[l4], "sport") else "#"
        )
        dst_str = "{}~{}~{}".format(
            p[Ether].dst if not "IP" in protos else "#",
            p[l3].dst if l3 is not None and hasattr(p[l3], "dst") else "#",
            p[l4].dport if l4 is not None and hasattr(p[l4], "dport") else "#"
        )
        # Make and return the key
        return "{} {} {}".format(
            "|".join(protos),
            *sorted([src_str, dst_str])
        )
    else:
        return "OTHER"

Extract the info with a regex like this, here additionally using the ipaddress library, but that's optional of course.

import ipaddress
import re

packet_list = scapy.utils.rdpcap("/tmp/some_pcap_file.pcap")
sessions = packet_list.sessions(_session_extractor_fullduplex)
for summary_str, packets in sessions.items():
    # Extract information from summary_str
    r = re.search(r"([^\s]+) ([^\s]+)~([^\s]+)~([^\s]+) ([^\s]+)~([^\s]+)~([^\s]+)", summary_str)
    if r is None:
        print("something's fishy")
    else:
        protos, mac1, ip1, port1, mac2, ip2, port2 = r.groups()
        protos = protos.split("|")
        mac1 = None if mac1 == "#" else mac1
        mac2 = None if mac2 == "#" else mac2
        ip1 = None if ip1 == "#" else ipaddress.ip_address(ip1)
        ip2 = None if ip2 == "#" else ipaddress.ip_address(ip2)
        port1 = None if port1 == "#" else port1
        port2 = None if port2 == "#" else port2

        print("{} {}:{} <-> {}:{} (interesting: {}): {} packets".format( \
            "|".join(protos),
            mac1 if ip1 is None else ip1,
            port1,
            mac2 if ip2 is None else ip2,
            port2,
            len(packets)))

@MarkBaggett Thanks for the great blog post!

@jvmk Yes you can add any information contained within the packet object p. For example, my function adds more information on layers of the packet. Just note that some values may throw off grouping, for example, if you added p[TCP].seq, grouping wouldn't work anymore...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment