Created
April 29, 2015 03:11
-
-
Save gerry/67cc89ab69b7a0d103eb to your computer and use it in GitHub Desktop.
Respond to and record dns lookups.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import random | |
import string | |
import datetime | |
import itertools | |
import threading | |
import traceback | |
import SocketServer | |
import operator as op | |
import dnslib | |
import flask | |
from flask import request | |
_response_ip = '8.8.8.8' # IP to respond with | |
_lookups_lock = threading.Lock() | |
_lookups = [] | |
app = flask.Flask(__name__) | |
app.config.from_object(__name__) | |
class BaseRequestHandler(SocketServer.BaseRequestHandler): | |
def get_data(self): | |
raise NotImplementedError | |
def send_data(self, data): | |
raise NotImplementedError | |
def handle(self): | |
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
(ip_addr, port) = self.client_address | |
try: | |
data = self.get_data() | |
if data is None: | |
return | |
request = dnslib.DNSRecord.parse(data) | |
domains = [".".join(q.qname.label) for q in request.questions] | |
domain = ".".join(request.q.qname.label) | |
print "[+] Got a request for '{}' from: {}".format(domain, ip_addr) | |
with _lookups_lock: | |
_lookups.append((now, domain, ip_addr,)) | |
reply = dnslib.DNSRecord(dnslib.DNSHeader(id=request.header.id), q=request.q) | |
reply.add_answer(dnslib.RR(rname=q.qname, | |
rtype=dnslib.QTYPE.A, rclass=1, ttl=60*5, rdata=dnslib.A(_response_ip))) | |
self.send_data(reply.pack()) | |
except Exception: | |
traceback.print_exc(file=sys.stderr) | |
class TCPRequestHandler(BaseRequestHandler): | |
def get_data(self): | |
data = self.request.recv(8192).strip() | |
sz = int(data[:2].encode('hex'), 16) | |
if sz < len(data) - 2: | |
return None | |
#raise Exception("Wrong size of TCP packet") | |
elif sz > len(data) - 2: | |
return None | |
#raise Exception("Too big TCP packet") | |
return data[2:] | |
def send_data(self, data): | |
sz = hex(len(data))[2:].zfill(4).decode('hex') | |
return self.request.sendall(sz + data) | |
class UDPRequestHandler(BaseRequestHandler): | |
def get_data(self): | |
return self.request[0].strip() | |
def send_data(self, data): | |
return self.request[1].sendto(data, self.client_address) | |
def html_encode(s): | |
for code in [("'", '''), ('"', '"'), ('>', '>'), ('<', '<'), ('&', '&')]: | |
s = s.replace(code[0], code[1]) | |
return s | |
@app.route('/s/<_id>/') | |
def status(_id): | |
key_func = op.itemgetter(1) | |
hits = [] | |
for k, grp in itertools.groupby(sorted(_lookups, key=key_func), key=key_func): | |
if k == _id: | |
hits = ["[{}] {}".format(_[0], html_encode(_[2])) for _ in grp] | |
return """<html><pre>{}</pre></html>""".format(hits and "\n".join(hits) or 'No Lookups') | |
@app.route('/') | |
def index(): | |
hits = ["[timestamp] - client ip - hostname"] | |
_sorted = sorted(_lookups, key=op.itemgetter(0)) | |
hits.extend(["[{}] - {} - {}".format(_[0], _[2], html_encode(_[1])) for _ in _sorted]) | |
return """<html><pre>Recent Lookups:\n{}</pre></html>""".format("\n".join(hits)) | |
if __name__ == "__main__": | |
print "Starting nameservers..." | |
servers = [ | |
SocketServer.ThreadingUDPServer(('', 53), UDPRequestHandler), | |
SocketServer.ThreadingTCPServer(('', 53), TCPRequestHandler)] | |
for s in servers: | |
thread = threading.Thread(target=s.serve_forever) | |
thread.daemon = True | |
thread.start() | |
print "Starting webserver..." | |
try: | |
app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False) | |
except KeyboardInterrupt: | |
pass | |
finally: | |
map(lambda s: s.shutdown(), servers) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment