-
-
Save Mithrilwoodrat/5b8805fc0d5e7a19a3dc2f54b3cdac45 to your computer and use it in GitHub Desktop.
Simple Scapy TCP Session
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: UTF-8 -*- | |
# Author: N0dr4x (n0dr4x@protonmail.com) | |
''' | |
Simple Scapy TCP Session class that provide ability | |
to : | |
- execute the 3-way handshake (eg. connect) | |
- properly close connection (->FIN/ACK, <-FIN/ACK, ->ACK ) | |
- send automatic acknowledgment of received tcp data packet | |
- build a next packet to send with correct sequence number | |
- directly send data through the session | |
HINT : Don't forget to block TCP/RST packet that was send | |
by the linux kernel because no source port was bound. | |
# iptables -A OUTPUT -p tcp --sport 1337 --tcp-flags RST RST -j DROP | |
Source port is, for now, fixed to 1337 to facilitate wireshark filtering. | |
The purpose of this class is to easily build a working tcp session and | |
have complete scapy control of the next tcp packet. | |
Usage & example : | |
# Create the session object and connect to host 192.168.13.37 port 80 | |
>>> sess = TcpSession(('192.168.13.37',80)) | |
>>> sess.connect() | |
# Build next packet and send it fragmented (layer 2) | |
>>> p = sess.build('GET / HTTP/1.1\r\n\r\n') | |
>>> send(fragment(p, fragsize=16)) | |
# Direct send data through the session and close | |
>>> sess.send('GET /index.html HTTP/1.1\r\n\r\n') | |
>>> sess.close() | |
# Session object can be reusable | |
>>> sess.connect() | |
>>> sess.send('GET /robot.txt HTTP/1.1\r\n\r\n') | |
>>> sess.close() | |
TODO : | |
1/ Optionally dump received data to a file | |
2/ Proper logging | |
''' | |
from scapy.all import * | |
from threading import Thread | |
class TcpSession: | |
def __init__(self,target): | |
self.seq = 0 | |
self.ack = 0 | |
self.ip = IP(dst=target[0]) | |
self.sport = 1337 | |
self.dport = target[1] | |
self.connected = False | |
self._ackThread = None | |
self._timeout = 3 | |
def _ack(self, p): | |
self.ack = p[TCP].seq + len(p[Raw]) | |
ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='A', seq=self.seq, ack=self.ack) | |
send(ack) | |
def _ack_rclose(self): | |
self.connected = False | |
self.ack += 1 | |
fin_ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='FA', seq=self.seq, ack=self.ack) | |
ack = sr1(fin_ack, timeout=self._timeout) | |
self.seq += 1 | |
assert ack.haslayer(TCP), 'TCP layer missing' | |
assert ack[TCP].flags & 0x10 == 0x10 , 'No ACK flag' | |
assert ack[TCP].ack == self.seq , 'Acknowledgment number error' | |
def _sniff(self): | |
s = L3RawSocket() | |
while self.connected: | |
p = s.recv(MTU) | |
if p.haslayer(TCP) and p.haslayer(Raw) \ | |
and p[TCP].dport == self.sport : | |
self._ack(p) | |
if p.haslayer(TCP) and p[TCP].dport == self.sport \ | |
and p[TCP].flags & 0x01 == 0x01 : # FIN | |
self._ack_rclose() | |
s.close() | |
self._ackThread = None | |
print('Acknowledgment thread stopped') | |
def _start_ackThread(self): | |
self._ackThread = Thread(name='AckThread',target=self._sniff) | |
self._ackThread.start() | |
def connect(self): | |
self.seq = random.randrange(0,(2**32)-1) | |
syn = self.ip/TCP(sport=self.sport, dport=self.dport, seq=self.seq, flags='S') | |
syn_ack = sr1(syn, timeout=self._timeout) | |
self.seq += 1 | |
assert syn_ack.haslayer(TCP) , 'TCP layer missing' | |
assert syn_ack[TCP].flags & 0x12 == 0x12 , 'No SYN/ACK flags' | |
assert syn_ack[TCP].ack == self.seq , 'Acknowledgment number error' | |
self.ack = syn_ack[TCP].seq + 1 | |
ack = self.ip/TCP(sport=self.sport, dport=self.dport, seq=self.seq, flags='A', ack=self.ack) | |
send(ack) | |
self.connected = True | |
self._start_ackThread() | |
print('Connected') | |
def close(self): | |
self.connected = False | |
fin = self.ip/TCP(sport=self.sport, dport=self.dport, flags='FA', seq=self.seq, ack=self.ack) | |
fin_ack = sr1(fin, timeout=self._timeout) | |
self.seq += 1 | |
assert fin_ack.haslayer(TCP), 'TCP layer missing' | |
assert fin_ack[TCP].flags & 0x11 == 0x11 , 'No FIN/ACK flags' | |
assert fin_ack[TCP].ack == self.seq , 'Acknowledgment number error' | |
self.ack = fin_ack[TCP].seq + 1 | |
ack = self.ip/TCP(sport=self.sport, dport=self.dport, flags='A', seq=self.seq, ack=self.ack) | |
send(ack) | |
print('Disconnected') | |
def build(self, payload): | |
psh = self.ip/TCP(sport=self.sport, dport=self.dport, flags='PA', seq=self.seq, ack=self.ack)/payload | |
self.seq += len(psh[Raw]) | |
return psh | |
def send(self, payload): | |
psh = self.build(payload) | |
ack = sr1(psh, timeout=self._timeout) | |
assert ack.haslayer(TCP), 'TCP layer missing' | |
assert ack[TCP].flags & 0x10 == 0x10, 'No ACK flag' | |
assert ack[TCP].ack == self.seq , 'Acknowledgment number error' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment