Skip to content

Instantly share code, notes, and snippets.

@kaichiachen
Created October 2, 2020 08:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kaichiachen/9b84defbce02cd738cd5187424418a0b to your computer and use it in GitHub Desktop.
Save kaichiachen/9b84defbce02cd738cd5187424418a0b to your computer and use it in GitHub Desktop.
A ping server demo building from raw packet
import logging
import argparse
import socket
import sys
import binascii
import struct
import array
import time
logging.basicConfig(level=logging.INFO,format='%(asctime)s %(levelname)-5s %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p')
ETH_HEADER_LEN = 14
IP_HEADER_LEN = 20
ICMP_HEADER_LEN = 8
def getIPChecksum(data):
sum = 0
for index in range(0,len(data),2):
word = (data[index] << 8) + (data[index+1])
sum = sum + word
sum = (sum >> 16) + (sum & 0xffff);
sum = ~sum & 0xffff
return sum
def getICMPChecksum(packet):
packetLen = len(packet)
sum = 0
remain = packetLen % 2
for i in range(0, packetLen-remain, 2):
sum += packet[i]+(packet[i+1]<<8)
if remain:
sum += packet[packetLen-1]
while (sum >> 16):
sum = (sum & 0xFFFF) + (sum >> 16)
return ~sum & 0xFFFF
class PingServer():
def __init__(self,):
self.ip = socket.gethostbyname(socket.gethostname())
with open('/sys/class/net/eth0/address') as f:
mac = f.readline()
self.mac = binascii.unhexlify(str.encode(''.join((mac.split(':'))))[:-1])
self.sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
self.sock.bind(('eth0',0))
def _buildEthHeader(self, dMAC):
eth_header = struct.pack('!6s6sH' , dMAC, self.mac, socket.htons(8))
return eth_header
def _buildIPHeader(self, dip):
pktID = 123
IHL_VERSION, TYPE_OF_SERVICE, total_len, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, src_IP,dest_IP = \
69, 0, 84, 0, 64, 1, 0, self.ip, dip
ip_header = struct.pack('!BBHHHBBH4s4s', IHL_VERSION, TYPE_OF_SERVICE, total_len, pktID, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, socket.inet_aton(self.ip), socket.inet_aton(dip))
check_sum_of_hdr = getIPChecksum(ip_header)
ip_header = struct.pack('!BBHHHBBH4s4s', IHL_VERSION, TYPE_OF_SERVICE, total_len, pktID, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, socket.inet_aton(self.ip),socket.inet_aton(dip))
return ip_header
def _buildICMPHeader(self, pktID, seq, data, icmp_type = 0, code = 0):
icmp_header = struct.pack('!BbHHh', icmp_type, code, 0, pktID, seq)
check_sum_of_icmp = getICMPChecksum(icmp_header + data)
icmp_header = struct.pack('!BbHHh', icmp_type, code, check_sum_of_icmp, pktID, seq)
return icmp_header
def _isIPv4(self, packet):
eth_header = packet[: ETH_HEADER_LEN ]
eth = struct.unpack('!6s6sH' , eth_header)
eth_protocol = socket.ntohs(eth[2])
return eth[0], eth[1], eth_protocol==8
def _isICMP(self, packet):
ip_header = packet[ ETH_HEADER_LEN: ETH_HEADER_LEN + IP_HEADER_LEN ]
_, _, _, _, _, _, PROTOCOL, _, src_IP, dest_IP = struct.unpack('!BBHHHBBH4s4s' , ip_header)
return socket.inet_ntoa(src_IP), socket.inet_ntoa(dest_IP), PROTOCOL==1
def _decoupleICMP(self, packet):
icmp_header = packet[ ETH_HEADER_LEN + IP_HEADER_LEN : ETH_HEADER_LEN + IP_HEADER_LEN + ICMP_HEADER_LEN]
icmp_type, code, checksum, pktID, seq = struct.unpack('!BbHHh', icmp_header)
return icmp_type, code, checksum, pktID, seq
def start(self):
while True:
packet, _ = self.sock.recvfrom(65565)
_, dMAC, isIPv4 = self._isIPv4(packet)
if not isIPv4: continue
req_IP, des_ip, _isICMP = self._isICMP(packet)
if not _isICMP or des_ip != self.ip: continue
icmp_type, code, checksum, pktID, seq = self._decoupleICMP(packet)
if icmp_type != 8: continue
data = packet[ETH_HEADER_LEN + IP_HEADER_LEN + ICMP_HEADER_LEN: ]
packet = self._buildEthHeader(dMAC) + self._buildIPHeader(req_IP) + self._buildICMPHeader(pktID, seq, packet[ETH_HEADER_LEN + IP_HEADER_LEN + ICMP_HEADER_LEN: ]) + data
self.sock.send(packet)
def close(self):
self.sock.close()
conn = PingServer()
conn.start()
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment