Skip to content

Instantly share code, notes, and snippets.

Created October 11, 2022 20:39
Show Gist options
  • Save TheTechromancer/d69e3b9e4d8a659c845bb38081d636d1 to your computer and use it in GitHub Desktop.
Save TheTechromancer/d69e3b9e4d8a659c845bb38081d636d1 to your computer and use it in GitHub Desktop.
A simple python script to filter out unresolved/wildcard DNS records.
#!/usr/bin/env python3
import sys
import string
import random
import dns.resolver
import threading
import tldextract
import concurrent.futures
_cache = dict()
__wildcard_lock = threading.Lock()
wildcard_locks = dict()
wildcards = dict()
rand_pool = string.ascii_lowercase + string.digits
def rand_string(length=10):
return "".join([random.choice(rand_pool) for _ in range(int(length))])
def is_domain(d):
extracted = tldextract.extract(d)
if extracted.domain and not extracted.subdomain:
return True
return False
def is_subdomain(d):
extracted = tldextract.extract(d)
if extracted.domain and extracted.subdomain:
return True
return False
def parent_domain(d):
if is_domain(d):
return d
split_domain = str(d).split(".")
if len(split_domain) == 1:
return "."
return ".".join(split_domain[1:])
def domain_parents(d):
Returns all parents of a subdomain --> [,]
parent = str(d)
while 1:
parent = parent_domain(parent)
if is_subdomain(parent):
yield parent
elif is_domain(parent):
yield parent
def resolve(query, rdtypes=None):
if rdtypes is None:
rdtypes = ("A", "AAAA", "TXT", "NS", "SOA", "MX", "CNAME", "SRV")
answers = set()
for rdtype in rdtypes:
for answer in list(dns.resolver.resolve(query, rdtype=rdtype)):
answers.add((rdtype, str(answer)))
except Exception as e:
return answers
def wildcard_lock( domain):
with __wildcard_lock:
return wildcard_locks[domain]
except KeyError:
lock = threading.Lock()
wildcard_locks[domain] = lock
return lock
def is_wildcard(query):
if is_domain(query):
return is_wildcard(f"lskdgahgasldfl.{query}")
parent = parent_domain(query)
with wildcard_lock(parent):
orig_results = set(a[-1] for a in resolve(query, rdtypes=("A", "AAAA")))
parents = set(domain_parents(query))
is_wildcard_bool = False
if parent in _cache:
return _cache[parent]
for parent in parents:
if parent in wildcards:
return True
wildcard_ips = set()
for parent in parents:
ips = set()
for _ in range(5):
rand_query = f"{rand_string(length=10)}.{parent}"
ips.update(set(a[-1] for a in resolve(rand_query, rdtypes=("A", "AAAA"))))
if ips:
except KeyError:
wildcards[parent] = ips
if orig_results and wildcard_ips and all([ip in wildcard_ips for ip in orig_results]):
is_wildcard_bool = True
_cache.update({parent: is_wildcard_bool})
return is_wildcard_bool
def clean(d):
is_wildcard_bool = False
ips = resolve(d)
if ips:
is_wildcard_bool = is_wildcard(d)
return ips, is_wildcard_bool
futures = dict()
with concurrent.futures.ThreadPoolExecutor(max_workers=250) as executor:
for dnsname in map(str.rstrip, sys.stdin):
future = executor.submit(clean, dnsname)
futures[future] = dnsname
for future in concurrent.futures.as_completed(futures):
dnsname = futures[future]
answers, is_wildcard_bool = future.result()
if answers and not is_wildcard_bool:
answers = [":".join(a) for a in answers]
except KeyboardInterrupt:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment