Skip to content

Instantly share code, notes, and snippets.

@digitalnomad91
Forked from LAKostis/spf.py
Created October 1, 2022 03:49
Show Gist options
  • Save digitalnomad91/cb70ad43878b091aef567238e5c6db2d to your computer and use it in GitHub Desktop.
Save digitalnomad91/cb70ad43878b091aef567238e5c6db2d to your computer and use it in GitHub Desktop.
Check SPF records for expired Domains and DNS lookup depth
#!/usr/bin/env python3
# modified version of this snippet
# https://gist.github.com/TheRook/95f2b872bdc81bac2371
import dns.resolver
import dns.name
from urllib.parse import urlparse
INFO = "\033[1m\033[36m[*]\033[0m "
WARN = "\033[1m\033[31m[!]\033[0m "
LOW = "\033[1m\033[34m[-]\033[0m "
MAYBE = "\033[1m\033[35m[?]\033[0m "
MONEY = "\033[1m\033[38m[$]\033[0m "
MAX_LOOKUPS = 100
MAX_DEPTH = 10
CURRENT_LOOKUPS = 0
QUIET = False
class SPFRecord(object):
def __init__(self, domain):
self.version = None
self.includes = []
self.ip4 = []
self.ip6 = []
self._dns_response = dns.resolver.query(domain, 'TXT')
self.txt_records = [txt.to_text() for txt in self._dns_response]
for txt in self.txt_records:
self._parse_txt(txt)
def _parse_txt(self, txt):
''' Parses a raw txt record '''
for entry in txt.split(' '):
if entry.startswith('v') and '=' in entry:
self._add_version(entry)
elif entry.startswith('include') and ':' in entry:
self._add_include(entry)
elif entry.startswith('redirect') and '=' in entry:
self._add_redirect(entry)
elif entry.startswith('ip4') and ':' in entry:
self._add_ip4(entry)
elif entry.startswith('ip6') and ':' in entry:
self._add_ip6(entry)
@property
def ips(self):
return self.ip4 + self.ip6
def _add_version(self, entry):
self.version = entry.split('=')[1]
def _add_include(self, entry):
self.includes.append(entry.split(':')[1])
def _add_redirect(self, entry):
self.includes.append(entry.split('=')[1].strip('"'))
def _add_ip4(self, entry):
ip = entry.split(':')[1]
self.ip4.append(ip)
def _add_ip6(self, entry):
ip = entry.split(':')[1]
self.ip6.append(ip)
def print_quiet(args,end='\n'):
if QUIET:
return True
else :
print(args,end=end)
def is_expired(domain):
try:
dns.resolver.query(domain)
return False
except dns.resolver.NXDOMAIN:
return True
except dns.resolver.NoAnswer:
return False
def get_spf_record(domain):
if is_expired(domain):
print_quiet (WARN + "%s does not resolve" % domain)
return None
print_quiet (INFO + domain + ' domain check passed')
try:
return SPFRecord(domain)
except dns.resolver.NoAnswer:
print_quiet ('\t' + LOW + "No TXT record for %s" % domain)
except dns.exception.Timeout:
print_quiet ('\t' + WARN + "DNS timeout for %s" % domain)
except dns.resolver.NoNameservers:
print_quiet ('\t' + WARN + "No name servers were found for %s" % domain)
except:
print_quiet ('\t' + WARN + "Something went wrong ...")
return None
def check_spf(spf, domain, max_lookups = 0, depth = 0):
try:
for inc_domain in spf.includes:
try:
print_quiet(INFO + 'checking SPF: ' + inc_domain)
url = urlparse("mail://%s" % inc_domain).netloc
parent = '.'.join(url.split('.')[-2:])
if is_expired(parent) :
print_quiet ('\t' + MONEY,)
print_quiet ("%s's parent domain \"%s\" is not registered" % (
inc_domain, parent))
else:
print_quiet ('\t' + LOW + \
"%s's parent domain is registered" % inc_domain)
except dns.resolver.NoAnswer:
print_quiet ('\t' + MAYBE + 'No answer for lookup of', inc_domain)
except dns.exception.Timeout:
print_quiet ('\t' + WARN + "DNS timeout for", parent)
except dns.resolver.NoNameservers:
print_quiet ('\t' + WARN + "No name servers were found for", parent)
if max_lookups - depth:
if depth > MAX_DEPTH:
print_quiet ('\t' + WARN + "SPF permerror, more than %s lookups needed.", MAX_DEPTH, parent)
inc_spf = get_spf_record(inc_domain)
depth = check_spf(inc_spf, domain, max_lookups, depth + 1)
except AttributeError:
print_quiet ('\t\t' + LOW + "No records to check!")
pass
return depth
if __name__ == '__main__':
import os
import argparse
import json
def read_files(files):
domains = []
for f in files:
if os.path.exists(f) and os.path.isfile(f):
with open(f, 'r') as fp:
domains += [line.strip() for line in fp.readlines()]
else:
print_quiet (WARN + "File does not exist", f)
return domains
parser = argparse.ArgumentParser(
description='Check domains for expired SPF records',
)
parser.add_argument('--version',
action='version',
version='%(prog)s v0.0.1'
)
parser.add_argument('--domains', '-d',
help='domains to check',
dest='domains',
nargs='*',
)
parser.add_argument('--csv', '-c',
help='read domains from csv file(s)',
dest='files',
nargs='*',
)
parser.add_argument('--maxlookups', '-m',
help='Maximum number of DNS queries used to resolve an SPF record.',
dest='maxlookups',
default=100,
nargs='*',
)
parser.add_argument('--quiet', '-q',
help='Don\'t print anything other than SPF records depth',
dest='quiet',
action='store_true',
)
parser.add_argument('--summary', '-s',
help='Print summary for DNS query depth (for multiple domains)',
dest='summary',
action='store_true',
)
parser.set_defaults(quiet=False,summary=False)
args = parser.parse_args()
QUIET = args.quiet
_domains = []
data = {}
sum_depth = 0
if args.files:
_domains += read_files(args.files)
if args.domains:
_domains += args.domains
for domain in _domains:
spf = get_spf_record(domain)
if spf is not None:
if QUIET and not args.summary:
data = {'depth':str(check_spf(spf, domain, args.maxlookups))}
print(json.dumps(data),end='')
elif args.summary:
sum_depth += check_spf(spf, domain, args.maxlookups)
else :
print ('SPF DNS query depth:' + str(check_spf(spf, domain, args.maxlookups)))
elif QUIET and not args.summary:
data = {'depth':'0'}
print(json.dumps(data),end='')
else:
continue
if QUIET and sum_depth > 0 :
print(json.dumps({'depth':str(sum_depth)}),end='')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment