Skip to content

Instantly share code, notes, and snippets.

@TheTechromancer
Last active October 25, 2023 16:49
Show Gist options
  • Save TheTechromancer/ed695d6d23e9f64d931cb1621076d0d8 to your computer and use it in GitHub Desktop.
Save TheTechromancer/ed695d6d23e9f64d931cb1621076d0d8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import asyncio
from bbot.scanner import Scanner
try:
file = sys.argv[-1]
except KeyError:
print("Usage: ./clean_dns_records.py input.txt")
sys.exit(1)
num_threads = 10
async def handle_dns_name(dns_name, bbot_scan):
try:
dns_event = bbot_scan.make_event(dns_name, "DNS_NAME", source=bbot_scan.root_event)
except Exception as e:
return
if not dns_event.type == "DNS_NAME":
return
(
dns_tags,
event_whitelisted_dns,
event_blacklisted_dns,
dns_children,
) = await bbot_scan.helpers.dns.resolve_event(dns_event)
for tag in dns_tags:
dns_event.add_tag(tag)
resolved_hosts = set()
for rdtype, ips in dns_children.items():
if rdtype in ("A", "AAAA", "CNAME"):
for ip in ips:
resolved_hosts.add(ip)
# check for wildcards
if not "unresolved" in dns_event.tags:
if not bbot_scan.helpers.is_ip_type(dns_event.host):
await bbot_scan.helpers.dns.handle_wildcard_event(dns_event, dns_children)
print(f"{dns_event.data}\t{','.join(sorted(dns_event.tags))}")
async def main():
with open(file) as f:
lines = f.read().splitlines()
lines = [l for l in set([l.strip().lower() for l in lines]) if l]
config = {
"dns_abort_threshold": 999999
}
bbot_scan = Scanner()
semaphore = asyncio.Semaphore(num_threads) # Limit to 5 concurrent tasks
tasks = []
for dns_name in lines: # If you want to run 10 tasks
tasks.append(controlled_execution(semaphore, dns_name, bbot_scan))
await asyncio.gather(*tasks)
async def controlled_execution(semaphore, dns_name, bbot_scan):
async with semaphore:
await handle_dns_name(dns_name, bbot_scan)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment