Skip to content

Instantly share code, notes, and snippets.

@kaichiachen
Created September 17, 2020 01:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kaichiachen/283f1b57e517f9ed558de6c2d15daf62 to your computer and use it in GitHub Desktop.
Save kaichiachen/283f1b57e517f9ed558de6c2d15daf62 to your computer and use it in GitHub Desktop.
Show how three-way handshake(connection) and four-way handshake(termination) works from RAW packets
import logging
import argparse
import socket
import sys
import binascii
import struct
import array
import time
logging.basicConfig(level=logging.INFO,format='%(asctime)s %(levelname)-5s %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p')
ETH_HEADER_LEN = 14
IP_HEADER_LEN = 20
TCP_HEADER_LEN = 20
def getIPChecksum(data):
sum = 0
for index in range(0,len(data),2):
word = (data[index] << 8) + (data[index+1])
sum = sum + word
sum = (sum >> 16) + (sum & 0xffff);
sum = ~sum & 0xffff
return sum
def getTCPChecksum(packet):
if len(packet) % 2 != 0:
packet += b'\0'
res = sum(array.array("H", packet))
res = (res >> 16) + (res & 0xffff)
res += res >> 16
return (~res) & 0xffff
class TcpHandshake():
def __init__(self, target):
self.dip, self.dport = target
self.sip, self.sport = socket.gethostbyname(socket.gethostname()), 20
with open('/sys/class/net/eth0/address') as f:
mac = f.readline()
self.mac = binascii.unhexlify(str.encode(''.join((mac.split(':'))))[:-1])
self.sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
self.sock.bind(('eth0',0))
def _buildEthHeader(self):
dMAC = '02:42:ac:11:00:05'
dMAC = binascii.unhexlify(str.encode(''.join((dMAC.split(':')))))
eth_header = struct.pack('!6s6sH' , dMAC, self.mac, socket.htons(8))
return eth_header
def _buildIPHeader(self):
pktID = 123
IHL_VERSION, TYPE_OF_SERVICE, total_len, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, src_IP,dest_IP = \
69, 0, 40, 16384, 64, 6, 0, socket.inet_aton(socket.gethostbyname(socket.gethostname())), socket.inet_aton(self.dip)
ip_header = struct.pack('!BBHHHBBH4s4s', IHL_VERSION, TYPE_OF_SERVICE, total_len, pktID, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, src_IP,dest_IP)
check_sum_of_hdr = getIPChecksum(ip_header)
ip_header = struct.pack('!BBHHHBBH4s4s', IHL_VERSION, TYPE_OF_SERVICE, total_len, pktID, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, src_IP,dest_IP)
return ip_header
def _buildTCPHeader(self, tcp_len, seq, ack_num, flags, window):
self.seq = seq
self.ack_num = ack_num
src_IP = socket.inet_aton(self.sip)
dest_IP = socket.inet_aton(self.dip)
src_port, dest_port, offset, checksum, urgent_ptr = \
self.sport, self.dport, tcp_len << 4, 0, 0
tcp_header = struct.pack('!HHIIBBHHH' , src_port, dest_port, self.seq, self.ack_num, offset, flags, window, checksum, urgent_ptr)
pseudo_hdr = struct.pack('!4s4sBBH', src_IP, dest_IP,0 , socket.IPPROTO_TCP, len(tcp_header))
checksum = getTCPChecksum(pseudo_hdr + tcp_header)
tcp_header = tcp_header[:16] + struct.pack('H', checksum) + tcp_header[18:]
return tcp_header
def start(self):
logging.info('Start three handshake')
recv = self.send_syn()
if not recv:
logging.error('Fail to receive ack and syn, TCP connection failed')
return
self.send_ack()
logging.info('Build connection sucessfully!')
time.sleep(2)
logging.info('Close tcp connection')
logging.info('do something...')
self.send_ack_fin()
def close(self):
self.sock.close()
def send_syn(self):
eth_header = self._buildEthHeader()
ip_header = self._buildIPHeader()
tcp_header = self._buildTCPHeader(tcp_len=5, seq=0, ack_num=0, flags=2, window=29200)
packet = eth_header + ip_header + tcp_header
self.sock.send(packet)
while True:
packet, _ = self.sock.recvfrom(65565)
eth_header = packet[: ETH_HEADER_LEN ]
eth = struct.unpack('!6s6sH' , eth_header)
eth_protocol = socket.ntohs(eth[2])
if eth_protocol != 8:
continue
ip_header = packet[ ETH_HEADER_LEN: ETH_HEADER_LEN + IP_HEADER_LEN ]
_, _, _, _, _, _, PROTOCOL, check_sum_of_hdr, src_IP,dest_IP = struct.unpack('!BBHHHBBH4s4s' , ip_header)
if dest_IP != socket.inet_aton(self.sip) or src_IP != socket.inet_aton(self.dip):
continue
if PROTOCOL != 6:
continue
tcp_header = packet[ ETH_HEADER_LEN + IP_HEADER_LEN : ETH_HEADER_LEN + IP_HEADER_LEN + TCP_HEADER_LEN]
src_port, dest_port, seq, ack_num, _, flags, _, checksum, _= struct.unpack('!HHLLBBHHH' , tcp_header)
if src_port != self.dport or dest_port != self.sport or flags != 18:
continue
self.seq = ack_num
self.ack_num = seq+1
return True
def send_ack(self):
eth_header = self._buildEthHeader()
ip_header = self._buildIPHeader()
tcp_header = self._buildTCPHeader(5, self.seq, self.ack_num, 16, 29200)
packet = eth_header + ip_header + tcp_header
self.sock.send(packet)
def send_ack_fin(self):
eth_header = self._buildEthHeader()
ip_header = self._buildIPHeader()
tcp_header = self._buildTCPHeader(5, self.seq, self.ack_num, 17, 29200)
packet = eth_header + ip_header + tcp_header
self.sock.send(packet)
while True:
packet, _ = self.sock.recvfrom(65565)
eth_header = packet[ : ETH_HEADER_LEN ]
eth = struct.unpack('!6s6sH' , eth_header)
eth_protocol = socket.ntohs(eth[2])
if eth_protocol != 8:
continue
ip_header = packet[ ETH_HEADER_LEN : ETH_HEADER_LEN + IP_HEADER_LEN]
_, _, _, _, _, _, PROTOCOL, check_sum_of_hdr, src_IP,dest_IP = struct.unpack('!BBHHHBBH4s4s' , ip_header)
if dest_IP != socket.inet_aton(self.sip) or src_IP != socket.inet_aton(self.dip):
continue
if PROTOCOL != 6:
continue
tcp_header = packet[ ETH_HEADER_LEN + IP_HEADER_LEN : ETH_HEADER_LEN + IP_HEADER_LEN + TCP_HEADER_LEN ]
src_port, dest_port, seq, ack_num, _, flags, _, checksum, _= struct.unpack('!HHLLBBHHH' , tcp_header)
if src_port != self.dport or dest_port != self.sport or flags != 17:
continue
self.seq = ack_num
self.ack_num = seq+1
break
eth_header = self._buildEthHeader()
ip_header = self._buildIPHeader()
tcp_header = self._buildTCPHeader(5, self.seq, self.ack_num, 16, 29200)
packet = eth_header + ip_header + tcp_header
self.sock.send(packet)
parser = argparse.ArgumentParser(description='TCP Connection Demo')
parser.add_argument('--host', action="store", help='ip')
parser.add_argument('--port', action="store", help='port')
args = parser.parse_args()
conn = TcpHandshake((args.host, int(args.port)))
conn.start()
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment