Skip to content

Instantly share code, notes, and snippets.

@Chubek
Created May 17, 2023 00:22
Show Gist options
  • Save Chubek/1fb8997bed45d744342a8a28fdcde749 to your computer and use it in GitHub Desktop.
Save Chubek/1fb8997bed45d744342a8a28fdcde749 to your computer and use it in GitHub Desktop.
dnsrezulf.py Request A, AAAA, TXT and CNAME Records from any DNS Resolver

I originally wrote dnsrezulf.py to prototype the Assembly + C code for the DNS resolver in Panah. That being said, I decided to package it separately as a Gist because I reckon it merits that. You may find a very simple, yet fast, DNS Resolver in C + Assembly, only for Linux, in the afore-linked repository.

What is dnsrezulf.py?

Basically, this Python script is the Walter Blanco to dig's Walter White. If dig is partying at the club, dnsrezulf.py is sitting at hoome, popping ritty after ritty, and writing useless code whilst he runs out of money. If dig is Daniel J. Bernstein, dnsrezulf.py is Terry A. Davis. If dig is Linux, dnsrezulf.py is a magnetic tape which has to be manually handled to read the next punch card. Basically, you get the gist (pun intended).

That being said, dnsrezulf.py contains implementation of the DNS protocol from scratch, based on RFC #1035. It is also commented to hell and back, and reading the entire script from top to bottom, bottom to top, and then top to bottom again will be educational in understanding of how DNS works.

dnsrezulf.py uses raw system sockets, and although I use a Linux machine, I did test it on my brother's Windows machine, and it does indeed run on Windows as well. Ipso facto, I assume it will run on any Unix-like, POSIX-compliant and various other operating systems with CPython implementation.

How do I use dnsrezulf.py?

Just wget the script, and run pass --help to see a list of arguments you may pass to it. By default, it uses 8.8.8.8:53 as resolver, and example.com as the address to resolve. The default record type is A which is IPV4. For IPV6 pass AAAA. For CNAME and TXT, pass them respectively.

An example:

python3 dnsrezulf.py -ad google.com -rr AAAA

This will return:

google.com | AAAA | ttl=300 | 2607:f8b0:400c:c38:0:0:0:66

(Note: this script has a shebang, on Unix-like systems, you can just enter the script without specifying the Python executable).

Please See

Please see my Github profile where I host a lot of cool stuff. Trust me, you won't be disappointed if you visit my profile. Since we mentioned DJB, what I'm currently working on is, a modification of DJB's LZY compression algoirthm, this time, with blocks, like Deflate. Stay tuned.

#!/usr/bin/python3
# python3 <scriptname> --help for help
# Part of VASSER Virtual Autonomous System
# Released under MIT License, see github/chubekVASSER/LICENSE for more info
# this script contain a simple, limited implementation of the
# DNS query system in Python as a part of Panah application
# notice that this script is not a complete implementation
# and has only been written for prototyping Panah DNS resolver
# please only use for educational/prototyping purposes
# based on RFC 1035: https://datatracker.ietf.org/doc/html/rfc1035
# also RFC 3596 for AAAA: https://datatracker.ietf.org/doc/html/rfc3596
from ctypes import c_ushort
from socket import AF_INET, SOCK_DGRAM, socket
# Part A: Globals & Utils
MAXU16 = 65535 # maximum of u16
ASCII_PERIOD = 46 # ascii for period
LEN_HEADER = 12 # length of DNS query header
RECURSE_DESIRED = 1 # setting this flag means we want recursive lookup
RECURSE_UNDESIRED = 0 # and this means recurse is undesired
RCODE_FORMATERROR = -1 # these are the 5 return codes of a dns query
RCODE_SERVERFAIL = -2 # format and name error, server fail, implementation issues
RCODE_NAMEERROR = -3 # and refused error, you can view all these in RFC 1035 page
RCODE_NOTIMPLEMENTED = -4 # 27 --- notice that these are not signed, however, we sign them
RCODE_REFUSED = -5 # because we wish to return them as signed values in case of error
ERROR_XIDMISMATCH = -6 # these two are the additional errors we wish to add
ERROR_NORECURSION = -7 # xid mismatch, and no recursion being available in resolver
CLASS_INTERNET = 1 # this is the resource class for internet, we will set it as default for obvious reasosn
RECORD_A = 1 # marks A record type
RECORD_CNAME = 5 # marks CNAME record type
RECORD_TXT = 16 # marks TXT record type
RECORD_AAAA = 28 # marks AAAA record type
ENCODINGS_PY = [
'ascii', # this is a list of all the Python text codecs
'big5', # we will use them at the end to decode CNAME and TXT records
'big5hkscs',
'cp037',
'cp273',
'cp424',
'cp437',
'cp500',
'cp720',
'cp737',
'cp775',
'cp850',
'cp852',
'cp855',
'cp856',
'cp857',
'cp858',
'cp860',
'cp861',
'cp862',
'cp863',
'cp864',
'cp865',
'cp866',
'cp869',
'cp874',
'cp875',
'cp932',
'cp949',
'cp950',
'cp1006',
'cp1026',
'cp1125',
'cp1140',
'cp1250',
'cp1251',
'cp1252',
'cp1253',
'cp1254',
'cp1255',
'cp1256',
'cp1257',
'cp1258',
'euc_jp',
'euc_jis_2004',
'euc_jisx0213',
'euc_kr',
'gb2312',
'gbk',
'gb18030',
'hz',
'iso2022_jp',
'iso2022_jp_1',
'iso2022_jp_2',
'iso2022_jp_2004',
'iso2022_jp_3',
'iso2022_jp_ext',
'iso2022_kr',
'latin_1',
'iso8859_2',
'iso8859_3',
'iso8859_4',
'iso8859_5',
'iso8859_6',
'iso8859_7',
'iso8859_8',
'iso8859_9',
'iso8859_10',
'iso8859_11',
'iso8859_13',
'iso8859_14',
'iso8859_15',
'iso8859_16',
'johab',
'koi8_r',
'koi8_t',
'koi8_u',
'kz1048',
'mac_cyrillic',
'mac_greek',
'mac_iceland',
'mac_latin2',
'mac_roman',
'mac_turkish',
'ptcp154',
'shift_jis',
'shift_jis_2004',
'shift_jisx0213',
'utf_32',
'utf_32_be',
'utf_32_le',
'utf_16',
'utf_16_be',
'utf_16_le',
'utf_7',
'utf_8',
'utf_8_sig',
'utf8',
'utf16',
'utf32',
'utf-8',
'utf-16',
'utf-32'
]
errors_explain = {
str(RCODE_FORMATERROR): "This is an rcode error, as specified by the RFC. This means there was a format error in the query.",
str(RCODE_SERVERFAIL): "This is an rcode error, as specified by the RFC. This means there was a server failure on the resolver's par, but sometimes it just means the formatting was wrong.",
str(RCODE_NAMEERROR): "This is an rcode error, as specified by the RFC. This means there was a problem with the DNS address, maybe it is not registered, or the resolver does not support it.",
str(RCODE_NOTIMPLEMENTED): "This is an rcode error, as specified by the RFC. This means the resolver does not support the query type.",
str(RCODE_REFUSED): "This is an rcode error, as specified by the RFC. This means the resolver has refused to honor this query",
str(ERROR_XIDMISMATCH): "This is a DNSRezulf error, This means the resolver sent an XID that did not match the XID we sent to it.",
str(ERROR_NORECURSION): "This is a DNSRezulf error, this means although we specified recursion, the resolver does not support it.",
}
def dataclass(cls: type) -> type:
args = {k: v for k, v in cls.__dict__.items() if not k.startswith("__")}
def init(self, **args):
for k, v in args.items():
exec(f"self.{k} = {v}")
def str(self):
string = ""
for k in self.__class__.__dict__:
if not k.startswith("__"):
val = eval(f'self.{k}')
string += f"{k} -> {val}\n"
return string
cls.__init__ = init
cls.__str__ = str
return cls
def generate_random_ushort() -> int: # this function generates a random unsigned 16-bit integer
from time import time_ns # we use it for XID generation
seed = time_ns()
return c_ushort(((seed << 5) + seed) % MAXU16).value
def error_out(message: str):
print("\033[1;31mError occured\033[0m")
print(message)
exit(1)
def get_executable_name() -> str:
from sys import executable
from pathlib import Path
return Path(executable).name
def get_script_name() -> str:
from sys import argv
return argv[0]
# this function decodes TXT and CNAME data based on the given codec in CMD
def decode_record_rdata(codec: str, rdata: bytearray) -> str:
execname = get_executable_name()
scriptname = get_script_name()
if codec[:3] == "raw":
return "bytearray[ " + ', '.join([str(int(b)) for b in rdata]) + "]"
elif codec == 'brute':
for enc in ENCODINGS_PY:
try:
return rdata.decode(enc)
except:
continue
finally:
print("None of Python's codecs could decode the result, printing it raw...")
return "[ " + ', '.join([str(int(b)) for b in rdata]) + "]"
else:
if codec not in ENCODINGS_PY:
error_out(f"Encoding {codec} is not present in Python's list of available codecs\nPlease pass `{execname} {scriptname} --encoding list` to see a full list of available codecs")
else:
try:
return rdata.decode(codec)
except:
print("Your selected codec could not decode the record data, returning raw...")
finally:
return "[ " + ', '.join([str(int(b)) for b in rdata]) + "]"
# Part B: Types
# this is the header for both response and question
@dataclass
class DNSQueryHeader:
xid = 0 # a random 16-bit unsigned ID
qr = 0 # question or response? One bit only
opcode = 0 # four bits. What type of query is it? We always set it to squery (standard querey)
aa = 0 # Authoratic Answer or not --- one bit
tc = 0 # has it be truncated? one bit
rd = 0 # only in question, is recursion desired? --- one bit
ra = 0 # only in response, is recursion available? --- one bit
z = 0 # always set to 0 --- three bits
rcode = 0 # response code, 4 bits
qdcount = 1 # 16-bit unsigned, question count
ancount = 0 # 16-bit unsigned, answer count
nscount = 0 # 16-bit unsigned, name authority record count
arcount = 0 # 16-bit unsigned, additional record count
# this is format of a question, there can be several, but we'll just send one
@dataclass
class DNSQueryQuestion:
qname = b"" # variable, name of the desired domain, null-terminated
qtype = 0 # type of the question, 16-bit unsigned
qclass = CLASS_INTERNET # class of the question, 16-bit unsigned
# this is format of a response resource record, there can be several, but we'll just send one
@dataclass
class DNSResourceRecord:
name = b"" # variable, human-readable name of the domain
rtype = 0 # unsigned 16-bit integer, type of the resource
rclass = CLASS_INTERNET # unsigned 16-bit integer, class of the resource
ttl = 0 # unsigned 16-bit integer, time interval until caching is valid
rdlength = 0 # unsigned 16-bit integer, length of rdata
rdata = b"" # variable, the data, for A and AAA it's the IPV4 and IPV6
# Part C: DNS Query Protocol
# this function will encode the dns address to (len, section, len, section..., NULL) form as specified by the RFC
# basically every section is separated by period (46 ascii) and they must come separated with their length before them
# we then must null-terminate the bytestring
def encode_dns_addr(addr: bytearray) -> bytearray:
qname = bytearray([0])
idxcntr = 0
for i, c in enumerate(addr):
if c == ASCII_PERIOD:
qname.append(0)
idxcntr = i + 1 # in case we hit a period, we append a zero which will be the length of the new
continue # section and we set the index of counter to it, then we continue
qname[idxcntr] += 1 # otherwise, we increase the index
qname.append(c) # and append the byte
qname.append(0) # we must null-terminate
return qname
# simple, make a new question object
def new_dns_query_question(addr: str, qtype=RECORD_A) -> bytearray:
qname = encode_dns_addr(addr)
return DNSQueryQuestion(qname=qname, qtype=qtype)
# make a new header object
def new_dns_query_header(rd=RECURSE_DESIRED):
return DNSQueryHeader(xid=generate_random_ushort(), rd=rd)
# encode the query header, as specified by the RFC
# the options are 16-bits, and as the net byte order goes, big-endian
def encode_dns_query_header(header: DNSQueryHeader) -> bytearray:
xid = header.xid.to_bytes(2, byteorder="big", signed=False)
qr = (header.qr & 1) << 15
opcode = (header.opcode & 15) << 11
aa = (header.aa & 1) << 10
tc = (header.tc & 1) << 9
rd = (header.rd & 1) << 8
ra = (header.ra & 1) << 7
z = (header.z & 7) << 4
rcode = header.rcode & 15
flags = (qr | opcode | aa | tc | rd | ra | z | rcode).to_bytes(2, byteorder='big')
qdcount = header.qdcount.to_bytes(2, byteorder='big', signed=False)
ancount = header.ancount.to_bytes(2, byteorder='big', signed=False)
nscount = header.nscount.to_bytes(2, byteorder='big', signed=False)
arcount = header.arcount.to_bytes(2, byteorder='big', signed=False)
return xid + flags + qdcount + ancount + nscount + arcount
def encode_dns_query_question(question: DNSQueryQuestion):
qtype = question.qtype.to_bytes(2, byteorder="big")
qclass = question.qclass.to_bytes(2, byteorder="big", signed=False)
return question.qname + qtype + qclass
# encode the final packet for request
def encode_dns_query_packet(header: DNSQueryHeader, question: DNSQueryQuestion) -> bytearray:
return encode_dns_query_header(header) + encode_dns_query_question(question)
# decoding the header is similiar to encoding the header, we just have to reverse the flag and turn bytes into integers instead
def decode_dns_query_header(response: bytes) -> DNSQueryHeader:
response = response[1:]
xid = c_ushort(int.from_bytes(response[:2], byteorder="big", signed=False)).value
flags = int.from_bytes(response[2:4], byteorder="big", signed=False)
qr = (flags & 32768) >> 15
opcode = (flags & 30720) >> 11
aa = (flags & 1024) >> 10
tc = (flags & 512) >> 9
rd = (flags & 256) >> 8
ra = (flags & 128) >> 7
z = (flags & 56) >> 4
rcode = flags & 15
qdcount = int.from_bytes(response[4:6], byteorder="big", signed=False)
ancount = int.from_bytes(response[6:8], byteorder="big", signed=False)
nscount = int.from_bytes(response[8:10], byteorder="big", signed=False)
arcount = int.from_bytes(response[10:12], byteorder="big", signed=False)
return DNSQueryHeader(xid=xid, qr=qr, opcode=opcode, aa=aa, tc=tc, rd=rd, ra=ra, z=z, rcode=rcode, qdcount=qdcount, ancount=ancount, nscount=nscount, arcount=arcount)
# decoding the dns query record is partly similar to the encoding of address
# we must start at byte 12, and add the bytes to our address name until we hit zero
# we then decode type, class and ttl
# after that, we get the length, and grab from that point on, plus rdlength
# that will give us our data
def decode_dns_query_resource_record(response: bytearray) -> DNSResourceRecord:
idx = LEN_HEADER # we add length of the header to our cursor
byte = 255
name = bytearray([])
while True:
if byte == 0:
break
idx += 1
name.append(byte)
byte = response[idx]
idx += 7 # we add the question offset to our cursor
response = response[idx:]
rtype = int.from_bytes(response[:2], byteorder="big", signed=False)
rclass = int.from_bytes(response[2:4], byteorder="big", signed=False)
ttl = int.from_bytes(response[4:8], byteorder="big", signed=True)
rdlength = int.from_bytes(response[8:10], byteorder="big", signed=False)
rdata = response[10:10 + rdlength]
return DNSResourceRecord(name=name, rtype=rtype, rclass=rclass, ttl=ttl, rdlength=rdlength, rdata=rdata)
def generate_and_compose_query(address: str, rectype=RECORD_A, recursive=RECURSE_DESIRED) -> tuple[bytes, int]:
header = new_dns_query_header(recursive)
question = new_dns_query_question(address, rectype)
return encode_dns_query_packet(header, question), header.xid
def parse_server_response(response: bytearray, xid: c_ushort, recursion=RECURSE_DESIRED, record=None) -> bytes:
header = decode_dns_query_header(response)
# check for errors in response
if header.xid != xid:
return ERROR_XIDMISMATCH # the XID given does not match with XID returned
elif header.ra != recursion:
return ERROR_NORECURSION # if we have set recursion to true, and server does not support it
elif header.rcode:
return -header.rcode # we sign-extend the rcode if it is non-zero and return it
record = decode_dns_query_resource_record(response)
return record
# Part D: Resolver
# the resolver interface puts everything we made prior together
class DNSResolver:
def __init__(self, resolver="8.8.8.8", port=53, bufsize=1024):
self.resolver = resolver
self.port = port
self.bufsize = bufsize
self.socket = socket(AF_INET, SOCK_DGRAM) # open a socket to the resolver server
def connect_to_resolver(self):
self.socket.connect((self.resolver, self.port)) # connect to the socket
def send_and_receive_query_and_parse_results(self, addr: str, rectype=RECORD_A, recursion=RECURSE_DESIRED, retries=3, record=None) -> bytes:
packet, xid = generate_and_compose_query(addr, rectype, recursion) # generate the packet
lenpacket = len(packet)
sent = self.socket.send(packet) # send the packet
while sent != lenpacket: # retry if send fails
if retries < 0:
break
sent = self.socket.send(packet)
retries -= 1
response = bytearray([0])
while True:
received_data = self.socket.recv(self.bufsize) # get the response
response += received_data
if len(received_data) < self.bufsize:
break
return parse_server_response(response, xid, recursion) # parse the response
def close_connection(self):
self.socket.close()
if __name__ == "__main__":
from sys import argv
execname = get_executable_name()
scriptname = get_script_name()
argv = argv[1:]
params = {
"resolver": ["--resolver", "-rs"],
"port": ["--port", "-p"],
"address": ["--address", "-ad"],
"rectype": ["--rectype", "-rr"],
"recursion": ["--recursion", "-re"],
"codec": ["--encoding", "-en"],
"errors": ["--errors", "-er"]
}
if "--help" in argv or "-h" in argv:
print("\033[1;33mDNSRezulf by Chubak Bidpaa\033[0m")
print("Released under MIT License")
print(f"DNSRezulf ({scriptname}) is a very simple DNS Resolver. Think, a watered down version of dig.")
print("You may request A, AAAA, CNAME and TXT records with it. You may specify whether the search is recursive or not.")
print("The default resolver in 8.8.8.8, which is Google's resolver. But you may select a different one.")
print("If you pass `list` to --encoding, it will print all the available codecs, and exit.")
print("If you pass an error code to --errors, it will explain the error code, and exit.")
print("Only standard queries are available. No inverse query and whatnot.")
print()
print("\033[1mArguments:")
print("[Long/Short]; Purpose; Default")
print("[--resolver/-rs]; DNS Resolver Server; 8.8.8.8")
print("[--port/-p]; DNS Resolver Port; 53")
print("[--address/-ad]; Address to Resolver; example.com")
print("[--rectype/-rr]; R Type; A")
print("[--recursion/-re]; Recursive Search; 1")
print("[--encoding/-en]; CNAME/TXT Codec; raw")
print("[--errors/-er; Error code explain; None")
print("\033[0m")
print(f"Example: {execname} {scriptname} -ad google.com --rectype AAAA")
print(f"On Unix-like systems, this script can be ran via the Shebang: {scriptname} -rs 1.1.1.1 -ad reddit.com")
exit(1)
args = {
"resolver": "8.8.8.8",
"port": "53",
"address": "example.com",
"rectype": "A",
"recursion": "1",
"codec": "raw",
"errors": None,
}
all_params = sum(params.values(), [])
skip = False
for arg in argv:
if skip:
args[skip] = arg
skip = False
continue
if arg in all_params:
for k, v in params.items():
if arg in v:
skip = k
else:
error_out(f"Illegal parameter: {arg}")
if args['errors'] is not None:
errcode = args['errors']
if errcode not in errors_explain:
error_out("Wrong error code passed to --errors")
print(errors_explain.get(errcode))
exit(1)
if args['codec'] == 'list':
print("\n".join(ENCODINGS_PY))
exit(1)
if not args["port"].isdigit():
error_out("Port must be digits")
port = int(args["port"])
if port < 0 or port > MAXU16:
error_out("Port must be between 0 and 65535")
resolver = args["resolver"]
address = args["address"].encode("ascii")
rectype = args["rectype"]
if rectype == "A":
rectype = RECORD_A
elif rectype == "AAAA":
rectype = RECORD_AAAA
elif rectype == "CNAME":
rectype = RECORD_CNAME
elif rectype == "TXT":
rectype = RECORD_TXT
else:
error_out("Wrong record type")
recursion = args["recursion"]
if recursion not in ["1", "0"]:
error_out("Recursion must be 0 or 1")
recursion = bool(recursion)
dnsresolver = DNSResolver(resolver, port)
dnsresolver.connect_to_resolver()
queryresult = dnsresolver.send_and_receive_query_and_parse_results(addr=address, rectype=rectype, recursion=recursion)
dnsresolver.close_connection()
if type(queryresult) == int and queryresult < 0:
error_out(f"Error ocurred, code: {queryresult}\nPlease type in {execname} {scriptname} --errors {queryresult} to inspect")
data = queryresult.rdata
ttl = queryresult.ttl
address = address.decode()
if rectype in [RECORD_TXT, RECORD_CNAME]:
data = decode_record_rdata(args['codec'], data)
if rectype == RECORD_A:
print(f"\t{address} | A | ttl={ttl} | {'.'.join([str(c) for c in data])}")
elif rectype == RECORD_AAAA:
print(f"\t{address} | AAAA | ttl={ttl} | {':'.join([format(int.from_bytes([c1, c2], byteorder='big', signed=False), 'x') for c1, c2 in zip(data[::2], data[1::2])])}")
elif rectype == RECORD_TXT:
print(f"\t{address} | TXT | ttl={ttl} | `{data}`")
else:
print(f"\t{address} | CNAME | ttl={ttl} | `{data}`")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment