Skip to content

Instantly share code, notes, and snippets.

@gerry
Created April 29, 2015 03:11
Show Gist options
  • Save gerry/67cc89ab69b7a0d103eb to your computer and use it in GitHub Desktop.
Save gerry/67cc89ab69b7a0d103eb to your computer and use it in GitHub Desktop.
Respond to and record dns lookups.
#!/usr/bin/env python
import sys
import random
import string
import datetime
import itertools
import threading
import traceback
import SocketServer
import operator as op
import dnslib
import flask
from flask import request
_response_ip = '8.8.8.8' # IP to respond with
_lookups_lock = threading.Lock()
_lookups = []
app = flask.Flask(__name__)
app.config.from_object(__name__)
class BaseRequestHandler(SocketServer.BaseRequestHandler):
def get_data(self):
raise NotImplementedError
def send_data(self, data):
raise NotImplementedError
def handle(self):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
(ip_addr, port) = self.client_address
try:
data = self.get_data()
if data is None:
return
request = dnslib.DNSRecord.parse(data)
domains = [".".join(q.qname.label) for q in request.questions]
domain = ".".join(request.q.qname.label)
print "[+] Got a request for '{}' from: {}".format(domain, ip_addr)
with _lookups_lock:
_lookups.append((now, domain, ip_addr,))
reply = dnslib.DNSRecord(dnslib.DNSHeader(id=request.header.id), q=request.q)
reply.add_answer(dnslib.RR(rname=q.qname,
rtype=dnslib.QTYPE.A, rclass=1, ttl=60*5, rdata=dnslib.A(_response_ip)))
self.send_data(reply.pack())
except Exception:
traceback.print_exc(file=sys.stderr)
class TCPRequestHandler(BaseRequestHandler):
def get_data(self):
data = self.request.recv(8192).strip()
sz = int(data[:2].encode('hex'), 16)
if sz < len(data) - 2:
return None
#raise Exception("Wrong size of TCP packet")
elif sz > len(data) - 2:
return None
#raise Exception("Too big TCP packet")
return data[2:]
def send_data(self, data):
sz = hex(len(data))[2:].zfill(4).decode('hex')
return self.request.sendall(sz + data)
class UDPRequestHandler(BaseRequestHandler):
def get_data(self):
return self.request[0].strip()
def send_data(self, data):
return self.request[1].sendto(data, self.client_address)
def html_encode(s):
for code in [("'", '&#39;'), ('"', '&quot;'), ('>', '&gt;'), ('<', '&lt;'), ('&', '&amp;')]:
s = s.replace(code[0], code[1])
return s
@app.route('/s/<_id>/')
def status(_id):
key_func = op.itemgetter(1)
hits = []
for k, grp in itertools.groupby(sorted(_lookups, key=key_func), key=key_func):
if k == _id:
hits = ["[{}] {}".format(_[0], html_encode(_[2])) for _ in grp]
return """<html><pre>{}</pre></html>""".format(hits and "\n".join(hits) or 'No Lookups')
@app.route('/')
def index():
hits = ["[timestamp] - client ip - hostname"]
_sorted = sorted(_lookups, key=op.itemgetter(0))
hits.extend(["[{}] - {} - {}".format(_[0], _[2], html_encode(_[1])) for _ in _sorted])
return """<html><pre>Recent Lookups:\n{}</pre></html>""".format("\n".join(hits))
if __name__ == "__main__":
print "Starting nameservers..."
servers = [
SocketServer.ThreadingUDPServer(('', 53), UDPRequestHandler),
SocketServer.ThreadingTCPServer(('', 53), TCPRequestHandler)]
for s in servers:
thread = threading.Thread(target=s.serve_forever)
thread.daemon = True
thread.start()
print "Starting webserver..."
try:
app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False)
except KeyboardInterrupt:
pass
finally:
map(lambda s: s.shutdown(), servers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment