Last active
October 25, 2023 16:49
-
-
Save TheTechromancer/ed695d6d23e9f64d931cb1621076d0d8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import asyncio | |
from bbot.scanner import Scanner | |
try: | |
file = sys.argv[-1] | |
except KeyError: | |
print("Usage: ./clean_dns_records.py input.txt") | |
sys.exit(1) | |
num_threads = 10 | |
async def handle_dns_name(dns_name, bbot_scan): | |
try: | |
dns_event = bbot_scan.make_event(dns_name, "DNS_NAME", source=bbot_scan.root_event) | |
except Exception as e: | |
return | |
if not dns_event.type == "DNS_NAME": | |
return | |
( | |
dns_tags, | |
event_whitelisted_dns, | |
event_blacklisted_dns, | |
dns_children, | |
) = await bbot_scan.helpers.dns.resolve_event(dns_event) | |
for tag in dns_tags: | |
dns_event.add_tag(tag) | |
resolved_hosts = set() | |
for rdtype, ips in dns_children.items(): | |
if rdtype in ("A", "AAAA", "CNAME"): | |
for ip in ips: | |
resolved_hosts.add(ip) | |
# check for wildcards | |
if not "unresolved" in dns_event.tags: | |
if not bbot_scan.helpers.is_ip_type(dns_event.host): | |
await bbot_scan.helpers.dns.handle_wildcard_event(dns_event, dns_children) | |
print(f"{dns_event.data}\t{','.join(sorted(dns_event.tags))}") | |
async def main(): | |
with open(file) as f: | |
lines = f.read().splitlines() | |
lines = [l for l in set([l.strip().lower() for l in lines]) if l] | |
config = { | |
"dns_abort_threshold": 999999 | |
} | |
bbot_scan = Scanner() | |
semaphore = asyncio.Semaphore(num_threads) # Limit to 5 concurrent tasks | |
tasks = [] | |
for dns_name in lines: # If you want to run 10 tasks | |
tasks.append(controlled_execution(semaphore, dns_name, bbot_scan)) | |
await asyncio.gather(*tasks) | |
async def controlled_execution(semaphore, dns_name, bbot_scan): | |
async with semaphore: | |
await handle_dns_name(dns_name, bbot_scan) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment