Last active
December 9, 2021 15:41
-
-
Save cr0hn/cfa4e6d04a20f6248a506c072ae0ba81 to your computer and use it in GitHub Desktop.
Scapy HTTP flow builder from a packet list
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Usage example | |
from scapy.all import * | |
from scapy.layers.http import * | |
sport, dport = detect_http_ports(rdpcap(pcap_file)) | |
if sport: | |
scapy.packet.bind_layers(TCP, HTTP, dport=dport) | |
scapy.packet.bind_layers(TCP, HTTP, sport=sport) | |
scapy.packet.bind_layers(TCP, HTTP, dport=sport) | |
scapy.packet.bind_layers(TCP, HTTP, sport=dport) | |
http_flow = build_http_flow(rdpcap("example.pcap")) | |
# Let's iterate through every packet | |
for http_request, http_response in http_flow: | |
req = http_request[HTTPRequest] | |
resp = http_response[HTTPResponse] | |
req.show() | |
resp.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scapy.all import * | |
from scapy.layers.http import * | |
def detect_http_ports(pkt_list): | |
# for pkt in pkt_list.sessions()["Other"]: | |
for pkt in pkt_list: | |
if not pkt.haslayer(TCP): | |
continue | |
# For non-common HTTP Ports | |
if pkt.haslayer(Raw): | |
# Response | |
if b"http/1.1" in pkt[Raw].load[:10].lower(): | |
return pkt[TCP].sport, pkt[TCP].dport | |
def build_http_flow(pkt_list: PacketList, debug: bool = False) -> List[Tuple[Packet, Packet]]: | |
""" | |
Build HTTP flow for scapy package list | |
Example: | |
>>> packets = rdpcap("my_file.pcap") | |
>>> flow = build_http_flow(packets) | |
>>> type(flow) | |
list | |
:rtype: A list of tuples as format: [ ( HTTP Request packet, HTTP Response packet) ] | |
""" | |
requests_flow = {} # {SEC: REQUEST} | |
responses_flow = {} # {SEC: REQUEST} | |
for pkt in pkt_list: | |
if HTTP in pkt: | |
if HTTPRequest in pkt: | |
if debug: | |
print(" -> request: ", pkt["TCP"].seq, " # ", pkt["TCP"].ack) | |
requests_flow[pkt["TCP"].ack] = pkt | |
if HTTPResponse in pkt: | |
if debug: | |
print(" -> response: ", pkt["TCP"].seq, " # ", pkt["TCP"].ack) | |
responses_flow[pkt["TCP"].seq] = pkt | |
http_flow = [] | |
for seq, resp in responses_flow.items(): | |
try: | |
req = requests_flow[seq] | |
http_flow.append((req, resp)) | |
except KeyError: | |
if debug: | |
print(f"Missing request for response TCP SEQ: {seq}") | |
return http_flow |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment