Skip to content

Instantly share code, notes, and snippets.

@mwollenweber
Created April 7, 2011 03:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mwollenweber/906961 to your computer and use it in GitHub Desktop.
Save mwollenweber/906961 to your computer and use it in GitHub Desktop.
Quick hack to compare performance of OpenDNS vs Google vs GW. Also checks blocking accuracy of dnsbh
#!/usr/bin/python
#Matthew Wollenweber
#mjw@cyberwart.com
import os,sys, ConfigParser
import time
import sqlite3
import urllib2
import urlparse
import csv
import getopt
import traceback
import signal
from scapy.all import *
class dnsPerfTester():
def __init__(self):
print "initializing dnsPerfTester"
self.os = "osx"
self.targets = ["www.google.com", "www.cnn.com", "www.gwu.edu", "yuyu98.com", "neoline-groupco.cc", "3dglases-panasonic-tv.com"]
self.multi_ips = True
self.target_count = 0
self.is_verbose = False
try:
self.conn = sqlite3.connect("/tmp/dns.sqlite")
self.curs = self.conn.cursor()
self.curs.execute('''CREATE TABLE dnsperf
(id INTEGER PRIMARY KEY AUTOINCREMENT, nameserver TEXT, target TEXT, ip TEXT, ttl INTEGER, qtime DOUBLE, type INTEGER) ''')
except:
print "ERROR creating sqlite table, Moving on"
def query_targets_bf(self, server_list, test_type = 0):
print "Querying targets (breadth first):"
server_count = len(server_list)
self.target_count = len(self.targets)
target_count = self.target_count
iterations = server_count * target_count
count = 0
print "I have %s servers and %s targets" % (server_count, target_count)
for target in self.targets:
print "ITERATION: %s of %s" % (count, iterations)
try:
for server in server_list:
t_start = time.time()
res = self.query_target(server, target)
t_elapsed = time.time() - t_start
t_el2 = round(t_elapsed * 1000, 6)
count += 1
for target, ip, ttl in res:
blob = [server, target, ip, ttl, t_el2, test_type]
self.curs.execute("INSERT INTO dnsperf (nameserver, target, ip, ttl, qtime, type) values (?, ?, ?, ?, ?, ?)", blob)
if self.is_verbose == True:
self.display_msg(blob)
if self.multi_ips == False:
break
self.conn.commit()
except:
if self.is_verbose == True:
traceback.print_exc(file=sys.stdout)
continue
def display_msg(self, blob):
#im lazy. a pretty print would be nice
print blob[0:5]
def query_targets(self, server, test_type = "0"):
print "Querying targets (depth first)"
self.target_count = len(self.targets)
print "I have %s targets" % self.target_count
count = 0
for target in self.targets:
try:
t_start = time.time()
res = self.query_target(server, target)
t_elapsed = time.time() - t_start
t_el2 = round(t_elapsed * 1000, 6)
print "ITERATION: %s of %s" % (count, self.target_count)
for target, ip, ttl in res:
blob = [server, target, ip, ttl, t_el2, test_type]
self.curs.execute("INSERT INTO dnsperf (nameserver, target, ip, ttl, qtime, type) values (?, ?, ?, ?, ?, ?)", blob)
if self.is_verbose == True:
self.display_msg(blob)
if self.multi_ips == False:
break
self.conn.commit()
count += 1
except:
if self.is_verbose == True:
traceback.print_exc(file=sys.stdout)
continue
def query_target(self, server, target):
dns_ans = sr1(IP(dst=server)/UDP()/DNS(rd=1,qd=DNSQR(qname=target)), verbose=int(self.is_verbose), inter=0.12,retry=1,timeout=1, iface=self.iface)
ans_count = dns_ans[DNS].ancount
ips = []
dns_ret = []
for i in range(0, ans_count):
if dns_ans[DNSRR][i].type == 1:
ips.append([dns_ans[DNSRR][i].rdata, dns_ans[DNSRR][i].ttl])
#print dns_ans.show()
for ip, ttl in ips:
dns_ret.append([target, ip, ttl])
return dns_ret
def get_mdl_hostnames(self, data):
hostnames = []
mdl_reader = csv.reader(data, delimiter=",", quoting=csv.QUOTE_MINIMAL)
for row in mdl_reader:
try:
url = row[1]
hostname = urlparse.urlsplit("http://" + url).hostname
if hostname != None:
for p in hostname.split("."):
if p.isalnum() is True:
hostnames.append(hostname)
break
except:
continue
return list(set(hostnames))
def get_mdl_targets(self):
#mdlurl = "http://www.malwaredomainlist.com/export.csv"
mdlurl = "http://www.malwaredomainlist.com/updatescsv.php"
f = urllib2.urlopen(mdlurl)
targets = self.get_mdl_hostnames(f)
f.close()
return targets
def set_targets(self, targets):
self.targets = targets
def main():
try:
opts, args = getopt.getopt(sys.argv[1:], "bdvme:f:t:")
except getopt.GetoptError, err:
print "ERROR: unable to parse args"
print str(err)
sys.exit(-1)
use_infile = is_verbose = use_mdl = test_type = depth_first = breadth_first = False
iface = "eth0"
for o, a in opts:
if o == "-f":
use_infile = True
infile = a
elif o == "-v":
is_verbose = True
elif o == "-m":
use_mdl = True
elif o == "-t":
print "setting test type to %s" % a
test_type = a
elif o == "-d":
depth_first = True
elif o == "-b":
breadth_first = True
elif o == "-e":
iface = a
else:
print "Unknown option: '%s'" % o
print "Let loose the dogs of war. Starting DNS Testing"
dpt = dnsPerfTester()
data = []
if use_infile == True:
print "using infile"
f = open(infile, "r")
for line in f:
data.append(line.strip())
dpt.set_targets(data)
f.close()
elif use_mdl == True:
print "laying it down with the mdl"
targets = dpt.get_mdl_targets()
dpt.set_targets(targets)
if is_verbose == True:
dpt.is_verbose = True
if depth_first == False and breadth_first == False:
print "ERROR: please specify breadth-first (-b) and/or depth-first (-d)"
sys.exit(-1)
dpt.iface = iface
for t in dpt.targets:
print "target = %s" % t
server_list = ["8.8.8.8", "208.67.222.222", "128.164.141.12", "208.67.220.220", "128.164.141.11", "161.253.152.50"]
#let loose the dogs of war
if depth_first == True:
for server in server_list:
dpt.query_targets(server, test_type)
#give everything a moment of calm
time.sleep(5)
dpt.conn.commit()
#alternative run breadth-first (bf)
if breadth_first == True:
dpt.query_targets_bf(server_list, test_type)
dpt.conn.commit()
dpt.curs.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment