Skip to content

Instantly share code, notes, and snippets.

@kaichiachen
Created September 19, 2020 15:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kaichiachen/9409e9aa327827da4df36f553389c4e0 to your computer and use it in GitHub Desktop.
Save kaichiachen/9409e9aa327827da4df36f553389c4e0 to your computer and use it in GitHub Desktop.
Show how to build a simple DNS server based on UDP
import logging
import argparse
import socket
import sys
import binascii
import struct
import array
import time
logging.basicConfig(level=logging.INFO,format='%(asctime)s %(levelname)-5s %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p')
ETH_HEADER_LEN = 14
IP_HEADER_LEN = 20
UDP_HEADER_LEN = 8
DNS_TABLE = {
'abc.com': '1.2.3.4',
'def.com': '2.3.4.5',
}
def getIPChecksum(data):
sum = 0
for index in range(0,len(data),2):
word = (data[index] << 8) + (data[index+1])
sum = sum + word
sum = (sum >> 16) + (sum & 0xffff);
sum = ~sum & 0xffff
return sum
def getUDPChecksum(data):
sum = 0
data_len = len(data)
if (data_len % 2):
data_len += 1
data += struct.pack('!B', 0)
for i in range(0, data_len, 2):
w = (data[i] << 8) + (data[i + 1])
sum += w
sum = (sum >> 16) + (sum & 0xFFFF)
sum = ~sum & 0xFFFF
return sum
class DNSServer():
def __init__(self, port):
self.ip, self.port = socket.gethostbyname(socket.gethostname()), port
with open('/sys/class/net/eth0/address') as f:
mac = f.readline()
self.mac = binascii.unhexlify(str.encode(''.join((mac.split(':'))))[:-1])
self.sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
self.sock.bind(('eth0',0))
def _buildEthHeader(self, dMAC):
eth_header = struct.pack('!6s6sH' , dMAC, self.mac, socket.htons(8))
return eth_header
def _buildIPHeader(self, dip):
pktID = 123
IHL_VERSION, TYPE_OF_SERVICE, total_len, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, src_IP,dest_IP = \
69, 0, 40, 16384, 64, 17, 0, self.ip, dip
ip_header = struct.pack('!BBHHHBBH4s4s', IHL_VERSION, TYPE_OF_SERVICE, total_len, pktID, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, socket.inet_aton(self.ip), socket.inet_aton(dip))
check_sum_of_hdr = getIPChecksum(ip_header)
ip_header = struct.pack('!BBHHHBBH4s4s', IHL_VERSION, TYPE_OF_SERVICE, total_len, pktID, FRAGMENT_STATUS, TIME_TO_LIVE, PROTOCOL, check_sum_of_hdr, socket.inet_aton(self.ip),socket.inet_aton(dip))
return ip_header
def _buildUDPHeader(self, dip, dest_port, udp_len, data):
udp_header = struct.pack('!4H', self.port, dest_port, udp_len, 0)
pseudo_header = struct.pack('!4s4sBBH', socket.inet_aton(self.ip), socket.inet_aton(dip), 0 , socket.IPPROTO_UDP, udp_len)
check_sum_of_udp = getUDPChecksum(pseudo_header + udp_header + data)
udp_header = struct.pack('!4H', self.port, dest_port, udp_len, check_sum_of_udp)
return udp_header
def _isIPv4(self, packet):
eth_header = packet[: ETH_HEADER_LEN ]
eth = struct.unpack('!6s6sH' , eth_header)
eth_protocol = socket.ntohs(eth[2])
return eth[0], eth[1], eth_protocol==8
def _isUDP(self, packet):
ip_header = packet[ ETH_HEADER_LEN: ETH_HEADER_LEN + IP_HEADER_LEN ]
_, _, _, _, _, _, PROTOCOL, _, src_IP, dest_IP = struct.unpack('!BBHHHBBH4s4s' , ip_header)
return socket.inet_ntoa(src_IP), socket.inet_ntoa(dest_IP), PROTOCOL==17
def _decoupleUDP(self, packet):
udp_header = packet[ ETH_HEADER_LEN + IP_HEADER_LEN : ETH_HEADER_LEN + IP_HEADER_LEN + UDP_HEADER_LEN]
src_port, _, udp_len, _ = struct.unpack('!4H', udp_header)
udp_data = packet[ ETH_HEADER_LEN + IP_HEADER_LEN + UDP_HEADER_LEN : ETH_HEADER_LEN + IP_HEADER_LEN + UDP_HEADER_LEN + udp_len - 8]
return src_port, udp_data
def start(self):
logging.info('Listen on Port: %d' % self.port)
while True:
packet, _ = self.sock.recvfrom(65565)
_, dMAC, isIPv4 = self._isIPv4(packet)
if not isIPv4: continue
req_IP, myip, isUDP = self._isUDP(packet)
if not isUDP or myip != self.ip: continue
src_port, udp_data = self._decoupleUDP(packet)
dns, ips = self.getIPFromData(udp_data)
data = ','.join(ips)
packet = self._buildEthHeader(dMAC) + self._buildIPHeader(req_IP) + self._buildUDPHeader(req_IP, src_port, 8 + len(data), str.encode(data)) + str.encode(data)
for d, i in zip(dns,ips):
logging.info('From %s ====> %s > %s' %(req_IP, d, i))
self.sock.send(packet)
def close(self):
self.sock.close()
def getIPFromData(self, udp_data):
dns = [ d.strip() for d in udp_data.decode().split(',')]
ips = []
for i in range(len(dns)):
ips.append(DNS_TABLE.get(dns[i],'Unexisting dns name'))
return dns, ips
parser = argparse.ArgumentParser(description='UDP Server Demo')
parser.add_argument('--port', action="store", help='port')
args = parser.parse_args()
conn = DNSServer((int(args.port)))
conn.start()
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment