Last active
November 5, 2023 14:13
-
-
Save N0dr4x/ffe99618a738978605719ce525a33042 to your computer and use it in GitHub Desktop.
Simple Scapy TCP Session
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
# Author: N0dr4x (n0dr4x@protonmail.com) | |
''' | |
Simple Scapy TCP Session class that provide ability | |
to : | |
- execute the 3-way handshake (eg. connect) | |
- properly close connection (->FIN/ACK, <-FIN/ACK, ->ACK ) | |
- send automatic acknowledgment of received tcp data packet | |
- build a next packet to send with correct sequence number | |
- directly send data through the session | |
HINT : Don't forget to block TCP/RST packet that was send | |
by the linux kernel because no source port was bound. | |
# iptables -A OUTPUT -p tcp --sport 1337 --tcp-flags RST RST -j DROP | |
Source port is, for now, fixed to 1337 to facilitate wireshark filtering. | |
The purpose of this class is to easily build a working tcp session and | |
have complete scapy control of the next tcp packet. | |
Usage & example : | |
# Create the session object and connect to host 192.168.13.37 port 80 | |
>>> sess = TcpSession(('192.168.13.37',80)) | |
>>> sess.connect() | |
# Build next packet and send it fragmented (layer 2) | |
>>> p = sess.build('GET / HTTP/1.1\r\n\r\n') | |
>>> send(fragment(p, fragsize=16)) | |
# Direct send data through the session and close | |
>>> sess.send('GET /index.html HTTP/1.1\r\n\r\n') | |
>>> sess.close() | |
# Session object can be reusable | |
>>> sess.connect() | |
>>> sess.send('GET /robot.txt HTTP/1.1\r\n\r\n') | |
>>> sess.close() | |
TODO : | |
1/ Optionally dump received data to a file | |
2/ Proper logging | |
''' | |
from scapy.all import * | |
from threading import Thread | |
class TcpSession: | |
def __init__(self,target): | |
self.seq = 0 | |
self.ack = 0 | |
self.ip = IP(dst=target[0]) | |
self.sport = 1337 | |
self.dport = target[1] | |
self.connected = False | |
self._ackThread = None | |
self._timeout = 3 | |
def _ack(self, p): | |
self.ack = p[TCP].seq + len(p[Raw]) | |
ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='A', seq=self.seq, ack=self.ack) | |
send(ack) | |
def _ack_rclose(self): | |
self.connected = False | |
self.ack += 1 | |
fin_ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='FA', seq=self.seq, ack=self.ack) | |
ack = sr1(fin_ack, timeout=self._timeout) | |
self.seq += 1 | |
assert ack.haslayer(TCP), 'TCP layer missing' | |
assert ack[TCP].flags & 0x10 == 0x10 , 'No ACK flag' | |
assert ack[TCP].ack == self.seq , 'Acknowledgment number error' | |
def _sniff(self): | |
s = L3RawSocket() | |
while self.connected: | |
p = s.recv(MTU) | |
if p.haslayer(TCP) and p.haslayer(Raw) \ | |
and p[TCP].dport == self.sport : | |
self._ack(p) | |
if p.haslayer(TCP) and p[TCP].dport == self.sport \ | |
and p[TCP].flags & 0x01 == 0x01 : # FIN | |
self._ack_rclose() | |
s.close() | |
self._ackThread = None | |
print('Acknowledgment thread stopped') | |
def _start_ackThread(self): | |
self._ackThread = Thread(name='AckThread',target=self._sniff) | |
self._ackThread.start() | |
def connect(self): | |
self.seq = random.randrange(0,(2**32)-1) | |
syn = self.ip/TCP(sport=self.sport, dport=self.dport, seq=self.seq, flags='S') | |
syn_ack = sr1(syn, timeout=self._timeout) | |
self.seq += 1 | |
assert syn_ack.haslayer(TCP) , 'TCP layer missing' | |
assert syn_ack[TCP].flags & 0x12 == 0x12 , 'No SYN/ACK flags' | |
assert syn_ack[TCP].ack == self.seq , 'Acknowledgment number error' | |
self.ack = syn_ack[TCP].seq + 1 | |
ack = self.ip/TCP(sport=self.sport, dport=self.dport, seq=self.seq, flags='A', ack=self.ack) | |
send(ack) | |
self.connected = True | |
self._start_ackThread() | |
print('Connected') | |
def close(self): | |
self.connected = False | |
fin = self.ip/TCP(sport=self.sport, dport=self.dport, flags='FA', seq=self.seq, ack=self.ack) | |
fin_ack = sr1(fin, timeout=self._timeout) | |
self.seq += 1 | |
assert fin_ack.haslayer(TCP), 'TCP layer missing' | |
assert fin_ack[TCP].flags & 0x11 == 0x11 , 'No FIN/ACK flags' | |
assert fin_ack[TCP].ack == self.seq , 'Acknowledgment number error' | |
self.ack = fin_ack[TCP].seq + 1 | |
ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='A', seq=self.seq, ack=self.ack) | |
send(ack) | |
print('Disconnected') | |
def build(self, payload): | |
psh = self.ip/TCP(sport=self.sport, dport=self.dport, flags='PA', seq=self.seq, ack=self.ack)/payload | |
self.seq += len(psh[Raw]) | |
return psh | |
def send(self, payload): | |
psh = self.build(payload) | |
ack = sr1(psh, timeout=self._timeout) | |
assert ack.haslayer(TCP), 'TCP layer missing' | |
assert ack[TCP].flags & 0x10 == 0x10, 'No ACK flag' | |
assert ack[TCP].ack == self.seq , 'Acknowledgment number error' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am interested in testing how many tcp sessions a device can handle, such as a stateful firewall or NAT device. My thought is to have 2 hosts running scapy connected to the device being tested. I could use a script like this (with many iterations, changing IP address and/or port) to act as a TCP client. Can similar be done to emulate a TCP server? Can Scapy receive TCP SYN packets and manually process them with a similar script set to receive SYN-ACK and respond with ACK, or will that host machine kernel always try to respond to the TCP session? My goal is for Scapy to be stateless in the process such that the test is not be limited by the kernel of the hosts running the handshake emulation.