Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A simple authoritative DNS server which returns the same answer for all questions
import time
import socket
import select
import sqlite3
import dns.message
import dns.exception
DNS_SERVER_PORT = 53
IP4_ADDR = '0.0.0.0' # IPv4 address to return
IP6_ADDR = '::' # IPv6 address to return (if you want to support IPv6)
def handle_dns_query(raw_query):
"""
Return the name and wire-formated DNS response
"""
query = dns.message.from_wire(raw_query)
question = str(query.question[0])
name, rtype = [r.strip() for r in question.split('IN')]
if rtype == 'A':
ip_resp = IP4_ADDR
elif rtype == 'AAAA':
ip_resp = IP6_ADDR
else:
print('Only A and AAAA are supoprted')
raise NotImplementedError
response = dns.message.make_response(query)
rrset = dns.rrset.from_text(name, 10, 1, rtype, ip_resp)
response.answer.append(rrset)
wire_resp = response.to_wire()
return name, wire_resp
def store(db, name, conn):
identifier = int(name.split('.')[0])
ip_addr, *_ = conn
db.execute('''
insert into ip (id, ip_addr) values (?, ?)
''', (identifier, ip_addr))
db.commit()
def main_loop():
db = sqlite3.connect('store.db')
db.execute('PRAGMA journal_mode=WAL;')
db.execute('''
create table if not exists ip (
id integer primary key not null,
ip_addr text not null
);
''')
port = DNS_SERVER_PORT
sock6 = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) # if you want to be IPv6-ready
sock6.bind(('::', port))
sock4 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock4.bind(('0.0.0.0', port)) # listen on public interfaces
socks = [ sock6, sock4 ]
while True:
rlist, _, _ = select.select(socks, [], [])
for conn in rlist:
dgram, addr = conn.recvfrom(4096)
# Resolve
try:
name, wire_resp = handle_dns_query(dgram)
except NotImplementedError:
continue # ignore the client
# Store
try:
store(db, name, addr)
except (sqlite3.Error, ValueError) as e:
print(e)
continue
# Respond
conn.sendto(wire_resp, addr)
if __name__ == '__main__':
while True:
try:
main_loop()
except Exception as e:
print(e)
time.sleep(3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.