Skip to content

Instantly share code, notes, and snippets.

@tintinweb
Last active August 4, 2023 19:58
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save tintinweb/8523a9a43a2fb61a6770 to your computer and use it in GitHub Desktop.
Save tintinweb/8523a9a43a2fb61a6770 to your computer and use it in GitHub Desktop.
simple scapy tcp three-way handshake
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Author : tintinweb@oststrom.com <github.com/tintinweb>
'''
A simple TCP three-way handshake example
#> python scapy_tcp_handshake.py
DEBUG:__main__:init: ('oststrom.com', 80)
DEBUG:__main__:start
DEBUG:__main__:SND: SYN
DEBUG:__main__:RCV: SYN+ACK
DEBUG:__main__:SND: SYN+ACK -> ACK
DEBUG:__main__:RCV: None
DEBUG:__main__:RCV: None
None
DEBUG:__main__:SND: FIN
DEBUG:__main__:RCV: None
Note: linux might send an RST for forged SYN packets. Disable it by executing:
#> iptables -A OUTPUT -p tcp --tcp-flags RST RST -s <src_ip> -j DROP
'''
from scapy.all import *
import logging
logger = logging.getLogger(__name__)
class TcpHandshake(object):
def __init__(self, target):
self.seq = 0
self.seq_next = 0
self.target = target
self.dst = iter(Net(target[0])).next()
self.dport = target[1]
self.sport = random.randrange(0,2**16)
self.l4 = IP(dst=target[0])/TCP(sport=self.sport, dport=self.dport, flags=0,
seq=random.randrange(0,2**32))
self.src = self.l4.src
self.swin = self.l4[TCP].window
self.dwin=1
logger.debug("init: %s"%repr(target))
def start(self):
logger.debug("start")
return self.send_syn()
def match_packet(self, pkt):
if pkt.haslayer(IP) and pkt[IP].dst == self.l4[IP].src \
and pkt.haslayer(TCP) and pkt[TCP].dport == self.sport \
and pkt[TCP].ack == self.seq_next:
return True
return False
def _sr1(self, pkt):
send(pkt)
ans = sniff(filter="tcp port %s"%self.target[1],lfilter=self.match_packet,count=1,timeout=1)
return ans[0] if ans else None
def handle_recv(self, pkt):
if pkt and pkt.haslayer(IP) and pkt.haslayer(TCP):
if pkt[TCP].flags & 0x3f == 0x12: # SYN+ACK
logger.debug("RCV: SYN+ACK")
return self.send_synack_ack(pkt)
elif pkt[TCP].flags & 4 != 0: # RST
logger.debug("RCV: RST")
raise Exception("RST")
elif pkt[TCP].flags & 0x1 == 1: # FIN
logger.debug("RCV: FIN")
return self.send_finack(pkt)
elif pkt[TCP].flags & 0x3f == 0x10: # FIN+ACK
logger.debug("RCV: FIN+ACK")
return self.send_ack(pkt)
logger.debug("RCV: %s"%repr(pkt))
return None
def send_syn(self):
logger.debug("SND: SYN")
self.l4[TCP].flags = "S"
self.seq_next = self.l4[TCP].seq + 1
response = self._sr1(self.l4)
self.l4[TCP].seq += 1
return self.handle_recv(response)
def send_synack_ack(self, pkt):
logger.debug("SND: SYN+ACK -> ACK")
self.l4[TCP].ack = pkt[TCP].seq+1
self.l4[TCP].flags = "A"
self.seq_next = self.l4[TCP].seq
response = self._sr1(self.l4)
return self.handle_recv(response)
def send_data(self, d):
self.l4[TCP].flags = "PA"
response = self._sr1(self.l4/d)
self.seq_next = self.l4[TCP].seq + len(d)
self.l4[TCP].seq += len(d)
return self.handle_recv(response)
def send_fin(self):
logger.debug("SND: FIN")
self.l4[TCP].flags = "F"
self.seq_next = self.l4[TCP].seq + 1
response = self._sr1(self.l4)
self.l4[TCP].seq += 1
return self.handle_recv(response)
def send_finack(self, pkt):
logger.debug("SND: FIN+ACK")
self.l4[TCP].flags = "FA"
self.l4[TCP].ack = pkt[TCP].seq+1
self.seq_next = self.l4[TCP].seq + 1
response = send(self.l4)
self.l4[TCP].seq += 1
raise Exception("FIN+ACK")
def send_ack(self, pkt):
logger.debug("SND: ACK")
self.l4[TCP].flags = "A"
self.l4[TCP].ack = pkt[TCP].seq+1
self.seq_next = self.l4[TCP].seq + 1
response = self._sr1(self.l4)
self.l4[TCP].seq += 1
if __name__=='__main__':
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)
conf.verb = 0
tcp_hs = TcpHandshake(("oststrom.com",80))
tcp_hs.start()
print repr(tcp_hs.send_data("INTENTIONAL BAD REQUEST\r\n\r\n\r\n"))
tcp_hs.send_fin()
@Kash-A
Copy link

Kash-A commented Aug 11, 2017

Can you please provide an example that how your file could be used with an application layer built in scapy? The purpose is to connect to a server and receive the data. In my case new layer of protocol has already been implemented. Just need to equip 3way handshake and sequence and acks at TCP level so that it could function properly.

I am new to whole this stuff. Please take it into consideration while answering. FYI, new layer is of S/IP, sercos interface protocol.

@mrbus
Copy link

mrbus commented Oct 10, 2020

If an incoming packet arrives in a time gap between send and sniff (lines 54-55), it will be lost, because sniff does not receive packets that arrived before it was called.

@kthomas8
Copy link

kthomas8 commented Nov 3, 2022

I am interested in testing how many tcp sessions a device can handle, such as a stateful firewall or NAT device. My thought is to have 2 hosts running scapy connected to the device being tested. I could use a script like this (with many iterations, changing IP address and/or port) to act as a TCP client. Can similar be done to emulate a TCP server? Can Scapy receive TCP SYN packets and manually process them with a similar script set to receive SYN-ACK and respond with ACK, or will that host machine kernel always try to respond to the TCP session? My goal is for Scapy to be stateless in the process such that the test is not be limited by the kernel of the hosts running the handshake emulation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment