-
-
Save nneonneo/1b371ac9da8703eda9c3a9b26d61a483 to your computer and use it in GitHub Desktop.
nahamcon "Contemporaneous Open" solution
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# sudo ufw disable | |
# sudo iptables -A INPUT -p tcp --destination-port 80 -j DROP | |
import random | |
from queue import Queue | |
recvq = Queue() | |
saddr = "10.138.0.11" | |
sport = 80 | |
caddr = None | |
cport = None | |
def my_send(pkt): | |
print("SEND", repr(pkt)) | |
send(pkt) | |
def recv_callback(pkt): | |
if IP not in pkt: | |
return | |
if pkt[IP].dst != saddr: | |
return | |
if TCP not in pkt: | |
return | |
if pkt[TCP].dport != sport: | |
return | |
print("RECV", repr(pkt)) | |
recvq.put(pkt) | |
def my_recv(): | |
while 1: | |
pkt = recvq.get() | |
if caddr is not None and pkt[IP].src != caddr: | |
continue | |
if cport is not None and pkt[TCP].sport != cport: | |
continue | |
return pkt | |
sniffer = AsyncSniffer(prn=recv_callback, lfilter=lambda pkt: TCP in pkt) | |
sniffer.start() | |
def wrap(x): | |
return x & 0xffffffff | |
try: | |
while 1: | |
# wait for SYN | |
p = my_recv() | |
if p[TCP].flags == "S": | |
break | |
caddr = p[IP].src | |
cport = p[TCP].sport | |
cseq = wrap(p[TCP].seq + 1) | |
sseq = random.getrandbits(32) | |
# send SYN | |
my_send(IP(src=saddr, dst=caddr, proto="tcp") / | |
TCP(sport=sport, dport=cport, flags="S", seq=sseq)) | |
sseq = wrap(sseq + 1) | |
# send ACK | |
my_send(IP(src=saddr, dst=caddr, proto="tcp") / | |
TCP(sport=sport, dport=cport, flags="A", seq=sseq, ack=cseq)) | |
# read request | |
data = bytearray() | |
while 1: | |
p = my_recv() | |
print(repr(p)) | |
if "S" in p[TCP].flags: | |
# ignore further SYNs | |
continue | |
cseq = p[TCP].seq + len(p[TCP].payload) | |
my_send(IP(src=saddr, dst=caddr, proto="tcp") / | |
TCP(sport=sport, dport=cport, flags="A", seq=sseq, ack=cseq)) | |
if p[TCP].payload: | |
data += p[TCP].payload.load | |
if data.endswith(b"\r\n\r\n"): | |
break | |
# reply | |
msg = b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: 6\r\nConnection: close\r\n\r\nHello!" | |
my_send(IP(src=saddr, dst=caddr, proto="tcp") / | |
TCP(sport=sport, dport=cport, flags="FPA", seq=sseq, ack=cseq) / | |
msg) | |
sseq = wrap(sseq + len(msg)) | |
# wait for client to FIN | |
while 1: | |
p = my_recv() | |
print(repr(p)) | |
if "F" in p[TCP].flags: | |
break | |
# ack FIN to complete TCP connection | |
my_send(IP(src=saddr, dst=caddr, proto="tcp") / | |
TCP(sport=sport, dport=cport, flags="A", seq=sseq, ack=cseq)) | |
finally: | |
sniffer.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment