Skip to content

Instantly share code, notes, and snippets.

@theanti9
Last active August 29, 2015 14:02
Show Gist options
  • Save theanti9/bc50dab54ed2f4190c5a to your computer and use it in GitHub Desktop.
Save theanti9/bc50dab54ed2f4190c5a to your computer and use it in GitHub Desktop.
# A naive dns server with a Redis backend
# now with added resolution if not in redis!
# -Neil
# Set keys in Redis you want to be authoritative for (set google.com. 127.0.0.1)
# Tip: Use Redis's ttl functions to have temporary names
# Currently only does A records, feel free to fix that
#
# Licensed under the PSF License
# Thanks to: http://code.activestate.com/recipes/491264-mini-fake-dns-server/
# Author forked from: @Kaerast <alice@kaerast.info>
# added resolution and caching - github.com/neilwillgettoit
import socket
import dns.resolver
import redis # https://github.com/andymccurdy/redis-py
class DNSQuery:
query_types = {
1: 'A', 2: 'NS', 3: 'MD', 4: 'MF', 5: 'CNAME', 6: 'SOA', 7: 'MB', 8: 'MG',
9: 'MR', 10: 'NULL', 11: 'WKS', 12: 'PTR', 13: 'HINFO', 14: 'MINFO', 15: 'MX',
16: 'TXT', 17: 'RP', 18: 'AFSDB', 19: 'X25', 20: 'ISDN', 21: 'RT', 22: 'NSAP',
23: 'NSAP-PTR', 24: 'SIG', 25: 'KEY', 26: 'PX', 27: 'GPOS', 28: 'AAAA', 29: 'LOC',
30: 'NXT', 31: 'EID', 32: 'NIMLOC', 33: 'SRV', 34: 'ATMA', 35: 'NAPTR', 36: 'KX',
37: 'CERT', 38: 'A6', 39: 'DNAME', 40: 'SINK', 41: 'OPT', 42: 'APL', 43: 'DS',
44: 'SSHFP', 45: 'IPSECKEY', 46: 'RRSIG', 47: 'NSEC', 48: 'DNSKEY', 49: 'DHCID',
50: 'NSEC3', 51: 'NSEC3PARAM'
}
def __init__(self, data):
self.data = data
byte_data = bytearray(data)
self.domains = []
self.rq_id = data[:2]
self.int_rq_id = byte_data[0] << 8 | byte_data[1]
qr = True if byte_data[2] & 0x80 == 0x80 else False
opcode = byte_data[2] & 0x78
tc = True if byte_data[2] & 0x02 == 0x02 else False
rd = True if byte_data[2] & 0x01 == 0x01 else False
self.qdcount = byte_data[4] << 8 | byte_data[5]
self.ancount = byte_data[6] << 8 | byte_data[7]
self.nscount = byte_data[8] << 8 | byte_data[9]
self.arcount = byte_data[10] << 8 | byte_data[11]
tipo = (ord(data[2]) >> 3) & 15 # Opcode bits
if tipo == 0: # Standard query
ini = 12
for i in range(self.qdcount):
lon = ord(data[ini])
domain = ''
while lon != 0:
domain += data[ini + 1:ini + lon + 1] + '.'
ini += lon + 1
lon = ord(data[ini])
qtype = byte_data[ini+1] << 8 | byte_data[ini+2]
ini += 5
self.domains.append((domain, qtype))
def answer(self, domain, ip):
packet = []
if domain and ip:
packet += self.rq_id + '\x81\x80'
packet += data[4:6] + data[4:6] + '\x00\x00\x00\x00' # Questions and Answers Counts
packet += self.data[12:] # Original Domain Name Question
packet += '\xc0\x0c' # Pointer to domain name
packet += '\x00\x01\x00\x01\x00\x00\x00\x3c\x00\x04' # Response type, ttl and resource data length -> 4 bytes
packet += ''.join(map(lambda x: chr(int(x)), ip.split('.'))) # 4bytes of IP
if not ip:
packet += self.rq_id + '\x81\x80'
packet += self.data[4:6] + '\x00\x00' + '\x00\x00\x00\x00' # Questions and Answers Counts
packet += self.data[12:] # Original Domain Name Question
packet += '\xc0\x0c' # Pointer to domain name
return ''.join(packet)
if __name__ == '__main__':
r = redis.StrictRedis(host='localhost', port=6379, db=0)
udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udps.bind(('',53))
with open('query_log', 'a') as q_log:
try:
while True:
data, addr = udps.recvfrom(1024)
p = DNSQuery(data)
qtype = DNSQuery.query_types[p.domains[0][1]]
ip = r.get(qtype + ":" + p.domains[0][0])
if ip is not None:
udps.sendto(p.answer(p.domains[0][0], ip), addr)
print 'Had it in Redis: %s %s -> %s' % (qtype, p.domains[0][0], ip)
else:
try:
rq = dns.resolver.query(p.domains[0][0] , qtype)
if qtype == 'A':
ip = rq[0].address
r.set(qtype + ":" + p.domains[0][0], ip)
udps.sendto(p.answer(p.domains[0][0], ip), addr)
print 'Went Upstream: %s %s -> %s' % (qtype, p.domains[0][0], ip)
else:
print 'Not replying: unsupported type: %s %s ' % (qtype, p.domains[0][0])
except Exception as e:
print e
print 'They Dont Think It Be Like It Is But It Do'
q_log.write("RequestID=%s; From=%s Requests=%s; QuestionCount=%d; AnswerCount=%d; NSCount=%d; ResourceCount=%d\n" % (p.int_rq_id, addr[0] + ":" + str(addr[1]), ' '.join([DNSQuery.query_types[q[1]]+":"+q[0] for q in p.domains]), p.qdcount, p.ancount, p.nscount, p.arcount))
q_log.flush()
except KeyboardInterrupt:
print 'Finished.'
udps.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment