-
-
Save digitalnomad91/cb70ad43878b091aef567238e5c6db2d to your computer and use it in GitHub Desktop.
Check SPF records for expired Domains and DNS lookup depth
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# modified version of this snippet | |
# https://gist.github.com/TheRook/95f2b872bdc81bac2371 | |
import dns.resolver | |
import dns.name | |
from urllib.parse import urlparse | |
INFO = "\033[1m\033[36m[*]\033[0m " | |
WARN = "\033[1m\033[31m[!]\033[0m " | |
LOW = "\033[1m\033[34m[-]\033[0m " | |
MAYBE = "\033[1m\033[35m[?]\033[0m " | |
MONEY = "\033[1m\033[38m[$]\033[0m " | |
MAX_LOOKUPS = 100 | |
MAX_DEPTH = 10 | |
CURRENT_LOOKUPS = 0 | |
QUIET = False | |
class SPFRecord(object): | |
def __init__(self, domain): | |
self.version = None | |
self.includes = [] | |
self.ip4 = [] | |
self.ip6 = [] | |
self._dns_response = dns.resolver.query(domain, 'TXT') | |
self.txt_records = [txt.to_text() for txt in self._dns_response] | |
for txt in self.txt_records: | |
self._parse_txt(txt) | |
def _parse_txt(self, txt): | |
''' Parses a raw txt record ''' | |
for entry in txt.split(' '): | |
if entry.startswith('v') and '=' in entry: | |
self._add_version(entry) | |
elif entry.startswith('include') and ':' in entry: | |
self._add_include(entry) | |
elif entry.startswith('redirect') and '=' in entry: | |
self._add_redirect(entry) | |
elif entry.startswith('ip4') and ':' in entry: | |
self._add_ip4(entry) | |
elif entry.startswith('ip6') and ':' in entry: | |
self._add_ip6(entry) | |
@property | |
def ips(self): | |
return self.ip4 + self.ip6 | |
def _add_version(self, entry): | |
self.version = entry.split('=')[1] | |
def _add_include(self, entry): | |
self.includes.append(entry.split(':')[1]) | |
def _add_redirect(self, entry): | |
self.includes.append(entry.split('=')[1].strip('"')) | |
def _add_ip4(self, entry): | |
ip = entry.split(':')[1] | |
self.ip4.append(ip) | |
def _add_ip6(self, entry): | |
ip = entry.split(':')[1] | |
self.ip6.append(ip) | |
def print_quiet(args,end='\n'): | |
if QUIET: | |
return True | |
else : | |
print(args,end=end) | |
def is_expired(domain): | |
try: | |
dns.resolver.query(domain) | |
return False | |
except dns.resolver.NXDOMAIN: | |
return True | |
except dns.resolver.NoAnswer: | |
return False | |
def get_spf_record(domain): | |
if is_expired(domain): | |
print_quiet (WARN + "%s does not resolve" % domain) | |
return None | |
print_quiet (INFO + domain + ' domain check passed') | |
try: | |
return SPFRecord(domain) | |
except dns.resolver.NoAnswer: | |
print_quiet ('\t' + LOW + "No TXT record for %s" % domain) | |
except dns.exception.Timeout: | |
print_quiet ('\t' + WARN + "DNS timeout for %s" % domain) | |
except dns.resolver.NoNameservers: | |
print_quiet ('\t' + WARN + "No name servers were found for %s" % domain) | |
except: | |
print_quiet ('\t' + WARN + "Something went wrong ...") | |
return None | |
def check_spf(spf, domain, max_lookups = 0, depth = 0): | |
try: | |
for inc_domain in spf.includes: | |
try: | |
print_quiet(INFO + 'checking SPF: ' + inc_domain) | |
url = urlparse("mail://%s" % inc_domain).netloc | |
parent = '.'.join(url.split('.')[-2:]) | |
if is_expired(parent) : | |
print_quiet ('\t' + MONEY,) | |
print_quiet ("%s's parent domain \"%s\" is not registered" % ( | |
inc_domain, parent)) | |
else: | |
print_quiet ('\t' + LOW + \ | |
"%s's parent domain is registered" % inc_domain) | |
except dns.resolver.NoAnswer: | |
print_quiet ('\t' + MAYBE + 'No answer for lookup of', inc_domain) | |
except dns.exception.Timeout: | |
print_quiet ('\t' + WARN + "DNS timeout for", parent) | |
except dns.resolver.NoNameservers: | |
print_quiet ('\t' + WARN + "No name servers were found for", parent) | |
if max_lookups - depth: | |
if depth > MAX_DEPTH: | |
print_quiet ('\t' + WARN + "SPF permerror, more than %s lookups needed.", MAX_DEPTH, parent) | |
inc_spf = get_spf_record(inc_domain) | |
depth = check_spf(inc_spf, domain, max_lookups, depth + 1) | |
except AttributeError: | |
print_quiet ('\t\t' + LOW + "No records to check!") | |
pass | |
return depth | |
if __name__ == '__main__': | |
import os | |
import argparse | |
import json | |
def read_files(files): | |
domains = [] | |
for f in files: | |
if os.path.exists(f) and os.path.isfile(f): | |
with open(f, 'r') as fp: | |
domains += [line.strip() for line in fp.readlines()] | |
else: | |
print_quiet (WARN + "File does not exist", f) | |
return domains | |
parser = argparse.ArgumentParser( | |
description='Check domains for expired SPF records', | |
) | |
parser.add_argument('--version', | |
action='version', | |
version='%(prog)s v0.0.1' | |
) | |
parser.add_argument('--domains', '-d', | |
help='domains to check', | |
dest='domains', | |
nargs='*', | |
) | |
parser.add_argument('--csv', '-c', | |
help='read domains from csv file(s)', | |
dest='files', | |
nargs='*', | |
) | |
parser.add_argument('--maxlookups', '-m', | |
help='Maximum number of DNS queries used to resolve an SPF record.', | |
dest='maxlookups', | |
default=100, | |
nargs='*', | |
) | |
parser.add_argument('--quiet', '-q', | |
help='Don\'t print anything other than SPF records depth', | |
dest='quiet', | |
action='store_true', | |
) | |
parser.add_argument('--summary', '-s', | |
help='Print summary for DNS query depth (for multiple domains)', | |
dest='summary', | |
action='store_true', | |
) | |
parser.set_defaults(quiet=False,summary=False) | |
args = parser.parse_args() | |
QUIET = args.quiet | |
_domains = [] | |
data = {} | |
sum_depth = 0 | |
if args.files: | |
_domains += read_files(args.files) | |
if args.domains: | |
_domains += args.domains | |
for domain in _domains: | |
spf = get_spf_record(domain) | |
if spf is not None: | |
if QUIET and not args.summary: | |
data = {'depth':str(check_spf(spf, domain, args.maxlookups))} | |
print(json.dumps(data),end='') | |
elif args.summary: | |
sum_depth += check_spf(spf, domain, args.maxlookups) | |
else : | |
print ('SPF DNS query depth:' + str(check_spf(spf, domain, args.maxlookups))) | |
elif QUIET and not args.summary: | |
data = {'depth':'0'} | |
print(json.dumps(data),end='') | |
else: | |
continue | |
if QUIET and sum_depth > 0 : | |
print(json.dumps({'depth':str(sum_depth)}),end='') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment