Last active
February 8, 2023 02:20
-
-
Save DOWRIGHTTV/7dfb9f639e3b1d02e347b15f100a4d1b to your computer and use it in GitHub Desktop.
utility to assist in identifying stale (no longer active) domain/hostname entries in large dns blocklists.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# This is free and unencumbered software released into the public domain. | |
# | |
# Anyone is free to copy, modify, publish, use, compile, sell, or | |
# distribute this software, either in source code form or as a compiled | |
# binary, for any purpose, commercial or non-commercial, and by any | |
# means. | |
# | |
# In jurisdictions that recognize copyright laws, the author or authors | |
# of this software dedicate any and all copyright interest in the | |
# software to the public domain. We make this dedication for the benefit | |
# of the public at large and to the detriment of our heirs and | |
# successors. We intend this dedication to be an overt act of | |
# relinquishment in perpetuity of all present and future rights to this | |
# software under copyright law. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | |
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR | |
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, | |
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR | |
# OTHER DEALINGS IN THE SOFTWARE. | |
# | |
# For more information, please refer to <https://unlicense.org> | |
# ===================================================================== | |
# ABOUT | |
# ===================================================================== | |
# this utility was written to support signature curation for the open | |
# source next-generation firewall "dnxfirewall". | |
# https://dnxfirewall.com or https://github.com/DOWRIGHTTV/dnxfirewall | |
# collecting and keeping these large lists up to date in addition to | |
# research and development is a big undertaking so open-source and | |
# freely available lists are vital in ensuring broad coverage of domain | |
# categories as a cross-reference when proxying internet traffic. | |
# the "block list project" is a great resource and all lists are placed | |
# in the public domain. https://github.com/blocklistproject/Lists | |
# to give back, i tried to make this utility more user-friendly by | |
# adding support for cli arguments and directly usable from the | |
# cli, which [hopefully] removes the requirement of needing to know the | |
# python programming language. | |
# ===================================================================== | |
# DESCRIPTION | |
# ===================================================================== | |
# this utility can be used to validate domains lists on a large scale. | |
# its intended use is to be used in conjunction with large domain block | |
# lists (for use in web filters or similar) and only provides details | |
# of whether a domain returned valid (A) or (NS) records. the absence | |
# of the former can indicate that a domain can be safely removed from | |
# the list as it will not resolve to an ip address and a connection to | |
# a remote server will not be made. | |
# notes: | |
# input files | |
# should specify the full path unless the file is in the same directory | |
# as the user when calling the utility. | |
# output files | |
# will be written to the current working directory (directory the user | |
# is in at execution). | |
# in/out formats | |
# input parser automatically detects if the line is in host file format | |
# so the file can contain either format or a combination of both. | |
# output of the results is written in CSV format. | |
# use argument "-c" to generate a "clean" file that containing only | |
# domains with at least 1 valid (A) record. if "-H" is used, then the | |
# clean output will be in host file format (0.0.0.0 domain.tld). | |
import os | |
import socket | |
import sys | |
import time | |
import datetime | |
import argparse | |
import threading | |
import multiprocessing as mp | |
from collections import deque | |
from ipaddress import IPv4Address | |
from socket import gethostbyname | |
from struct import Struct | |
# building a unique filename based on the current time | |
current_time = datetime.datetime.now().strftime('%I-%H-%M.%S%p') | |
FILENAME_SEED = f'{current_time}-results' | |
# counters used to communicate status between processes | |
SHARED_COUNTERS = [mp.Value('i', 0), mp.Value('i', 0), mp.Value('i', 0)] | |
# time used as a reference for determining how long the utility ran. | |
START_TIME = time.localtime() | |
# CloudFlare set to default because they will rarely throttle | |
DEFAULT_SERVER_1 = IPv4Address('1.1.1.1') | |
DEFAULT_SERVER_2 = IPv4Address('1.0.0.1') | |
# direct references and helper functions | |
byte_join = b''.join | |
byte_pack = Struct('!B').pack | |
dns_header_pack = Struct('!6H').pack | |
dns_header_unpack = Struct('!6H').unpack_from | |
def create_header(dns_id, arc=0, *, cd): | |
bit_fields = (1 << 8) | (cd << 4) | |
return dns_header_pack(dns_id, bit_fields, 1, 0, 0, arc) | |
def progress(send_count: int, recv_count: int, file_ct: str) -> None: | |
print(f'{time.strftime("%H:%M:%S", START_TIME)} | {time.strftime("%H:%M:%S")} | ', end='') | |
print(f'proc[{file_ct}] | recv[{recv_count}] | sent[{send_count}/{PROGRESS_TOTAL_COUNT}] \r', end='') | |
class ValidationProcessor: | |
'''Primary worker and individual task manager. | |
this class is designed to be self-managed with each instance running in a separate subprocess. | |
only basic status will be reported back up to the main process. | |
each instance will write it results to a partial file when its job queue is finished processing. | |
''' | |
__slots__ = ( | |
'_t_lock', '_send_count', '_recv_count', '_results', '_job_done', '_count_done', '_query_sock', | |
'ttl_send_count', 'ttl_recv_count', 'notify_done', 'idx', 'file_idx', 't_count', 'jobs' | |
) | |
def __init__(self, idx: int, shared_counters: list[mp.Value], server_ip: IPv4Address, jobs: deque): | |
self.ttl_send_count: mp.Value = shared_counters[0] | |
self.ttl_recv_count: mp.Value = shared_counters[1] | |
self.notify_done: mp.Value = shared_counters[2] | |
self.idx: int = idx | |
self.file_idx: str = f'{idx}' | |
self.jobs: deque = jobs | |
self._t_lock = threading.Lock() | |
self._send_count = 0 | |
self._recv_count = 0 | |
self._results = deque() | |
self._job_done = False | |
self._count_done = False | |
# create a UDP socket and call its "connect" method. | |
# for UDP the "connect" method stores the server address, which allows us to use the "send" method | |
self._query_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self._query_sock.settimeout(5) | |
self._query_sock.connect((f'{server_ip}', 53)) | |
@classmethod | |
def run(cls, idx: int, shared_counters: mp.Value, server_ip: IPv4Address, jobs: deque): | |
self = cls(idx, shared_counters, server_ip, jobs) | |
self._process_loop() | |
def sender(self): | |
sock_send = self._query_sock.send | |
jobs_pop = self.jobs.popleft | |
# hostname termination byte added in q[2] for simplicity | |
q_template = [create_header(dns_id=self.idx, cd=1), b'\x00', b'\x00\x00\x01\x00\x01'] | |
while self.jobs: | |
q_template[1] = jobs_pop() | |
# try block will ensure the sender loop exits when the responder thread is done or the socket is timed out. | |
try: | |
sock_send(byte_join(q_template)) | |
except OSError: | |
break | |
self._send_count += 1 | |
time.sleep(.003) | |
def receiver(self): | |
sock_recv = self._query_sock.recv | |
results_append = self._results.append | |
while True: | |
try: | |
results_append(sock_recv(1024)) | |
except OSError: | |
break | |
self._recv_count += 1 | |
if (self._recv_count == self._send_count): | |
break | |
self._query_sock.close() | |
def _report_thread(self): | |
# used to ensure a thread completion count doesn't get reported more than once | |
last_send_count = self._send_count | |
last_recv_count = self._recv_count | |
while True: | |
new_send_count = self._send_count | |
if (new_send_count > last_send_count): | |
with self.ttl_send_count.get_lock(): | |
self.ttl_send_count.value += (new_send_count - last_send_count) | |
last_send_count = new_send_count | |
new_recv_count = self._recv_count | |
if (new_recv_count > last_recv_count): | |
with self.ttl_recv_count.get_lock(): | |
self.ttl_recv_count.value += (new_recv_count - last_recv_count) | |
last_recv_count = new_recv_count | |
# prevents this thread from just spinning when the process is done | |
if (self._job_done and (new_send_count == last_send_count and new_recv_count == last_recv_count)): | |
self._count_done = True | |
break | |
time.sleep(.1) | |
def _process_loop(self): | |
threading.Thread(target=self._report_thread).start() | |
sender = threading.Thread(target=self.sender) | |
receiver = threading.Thread(target=self.receiver) | |
sender.start() | |
receiver.start() | |
threads = [sender, receiver] | |
# waits for all thread to finish, which also means the job queue has been fully processed | |
for t in threads: | |
t.join() | |
# each process will write its results separately | |
try: | |
self._write_to_disk() | |
finally: | |
# notifying the count reporter thread to wrap up | |
self._job_done = True | |
# waiting for the count reporter thread to finish counting (progress data accuracy) | |
while not self._count_done: | |
time.sleep(.1) | |
# counter shared between all processes and used by the main process to know when all jobs are complete. | |
with self.notify_done.get_lock(): | |
self.notify_done.value += 1 | |
def _write_to_disk(self): | |
processed = [] | |
processed_append = processed.append | |
for data in self._results: | |
dns_header = dns_header_unpack(data) | |
temp_name, offset = [], 12 | |
while data := data[offset:]: | |
if not (length := data[0]): | |
break | |
temp_name.append(data[1:1 + length]) | |
offset = 1 + length | |
name = b'.'.join(temp_name).decode('utf-8') | |
processed_append(f'{name},{dns_header[3]},{dns_header[4]}') | |
with open(f'{FILENAME_SEED}-{self.file_idx}.txt', 'w') as _signatures: | |
if (self.idx == 1): | |
_signatures.write('host,resource,authority\n') | |
_signatures.write('\n'.join(processed)) | |
if (__name__ == '__main__'): | |
parser = argparse.ArgumentParser(description='bulk domain name [(A) record] verification.') | |
parser.add_argument('filename', help='input file with list of hostnames', type=argparse.FileType('r')) | |
parser.add_argument('-c', help='output separate "clean" file', action='store_true') | |
parser.add_argument('-H', help='use host file format for output', action='store_true') | |
parser.add_argument('--proc', metavar='{1 - 4}', help='number of parallel workloads (evenly split)', | |
type=int, choices=range(1, 5), default=1) | |
parser.add_argument('--iter-limit', metavar='{1 - 999999}', help='limits total processed names (FIFO)', | |
type=int, choices=range(1, 1000000), default=999999) | |
parser.add_argument('--servers', metavar='ip_address', help='List of IP Addresses to use for resolving DNS', | |
type=IPv4Address, nargs='+', default=[DEFAULT_SERVER_1, DEFAULT_SERVER_2]) | |
args = parser.parse_args(sys.argv[1:]) | |
CLEAN_OUTPUT = args.c | |
HOST_FILE_FORMAT = args.H | |
SIGNATURES = args.filename | |
PROCESS_COUNT: int = args.proc # parallel processing | |
ITER_COUNT: int = args.iter_limit # clamp amount of names to check | |
RESOLVERS: list[IPv4Address] = args.servers | |
# filtering out any comments | |
_sigs = [x for x in SIGNATURES.read().splitlines() if '#' not in x] | |
# PROGRESS_TOTAL_COUNT = ITER_COUNT if ITER_COUNT < len(sigs) else len(sigs) | |
if (len(_sigs) > ITER_COUNT): | |
_sigs = _sigs[:ITER_COUNT] | |
sigs = [] | |
sigs_append = sigs.append | |
# encoding hostname to bytes as a preprocessor - automatically detects if the line is in host file format | |
for sig in _sigs: | |
try: | |
if (' ' in sig): | |
sig = sig.split()[1] | |
encoded_sig = byte_join([byte_pack(len(s)) + s.encode('utf-8') for s in sig.split('.')]) | |
except: | |
print(f'hostname format error->{sig}') | |
else: | |
sigs_append(encoded_sig) | |
# successfully encoded names only | |
PROGRESS_TOTAL_COUNT = len(sigs) | |
# total number of valid hostnames divided by the process count specified in cli argument | |
# adding the remainder will ensure that odd out data doesn't roll into a new chunk. | |
STEP = PROGRESS_TOTAL_COUNT // PROCESS_COUNT + (PROGRESS_TOTAL_COUNT % PROCESS_COUNT > 0) | |
process_id, processes = 1, [] | |
for i in range(0, PROGRESS_TOTAL_COUNT, STEP): | |
dataset = deque(sigs[i:i+STEP]) | |
resolver = RESOLVERS[(process_id-1) % 2] | |
pr = mp.Process(target=ValidationProcessor.run, args=(process_id, SHARED_COUNTERS, resolver, dataset)) | |
pr.start() | |
processes.append(pr) | |
process_id += 1 | |
ttl_send_count = SHARED_COUNTERS[0] | |
ttl_recv_count = SHARED_COUNTERS[1] | |
notify_done = SHARED_COUNTERS[2] | |
try: | |
while notify_done.value < PROCESS_COUNT: | |
progress(ttl_send_count.value, ttl_recv_count.value, f'{notify_done.value}/{PROCESS_COUNT}') | |
time.sleep(.5) | |
except: | |
pass | |
progress(ttl_send_count.value, ttl_recv_count.value, f'{notify_done.value}/{PROCESS_COUNT}') | |
for p in processes: | |
p.terminate() | |
print(f'\ndone. ({PROCESS_COUNT}) processes terminated. merging files...') | |
# ================== | |
# MERGING FILES | |
# ================== | |
# lazily grabbing all the files that were created by sub processes | |
result_files = [f for f in sorted(os.listdir()) if FILENAME_SEED in f] | |
i, merge_failed = 0, [] | |
# creating file without process id to aggregate into | |
with open(f'{FILENAME_SEED}.csv', 'w') as combined_results: | |
# copying over each file | |
for i, file in enumerate(result_files, 1): | |
# removing the partial from disk if it is successfully merged. | |
try: | |
with open(file, 'r') as partial_results: | |
combined_results.write(partial_results.read()) | |
combined_results.write('\n') | |
except: | |
merge_failed.append(file) | |
else: | |
os.remove(file) | |
print(f'merged: {i-len(merge_failed)}, failed: {len(merge_failed)}. see {FILENAME_SEED}.csv for results.') | |
if (CLEAN_OUTPUT): | |
print('generating clean output.') | |
ext = 'host' if HOST_FILE_FORMAT else 'txt' | |
pre = '0.0.0.0 ' if HOST_FILE_FORMAT else '' | |
with open(f'{FILENAME_SEED}.csv', 'r') as combined_results: | |
with open(f'{FILENAME_SEED}.{ext}', 'w') as clean_output: | |
for line in combined_results: | |
line = line.split(',') | |
try: | |
domain = line[0] | |
a_r = int(line[1]) | |
ns_r = int(line[2]) | |
except: | |
continue | |
if (a_r > 0): | |
clean_output.write(f'{pre}{domain}\n') | |
print(f'done. see {FILENAME_SEED}.{ext}.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment