Skip to content

Instantly share code, notes, and snippets.

@DOWRIGHTTV
Last active February 8, 2023 02:20
Show Gist options
  • Save DOWRIGHTTV/7dfb9f639e3b1d02e347b15f100a4d1b to your computer and use it in GitHub Desktop.
Save DOWRIGHTTV/7dfb9f639e3b1d02e347b15f100a4d1b to your computer and use it in GitHub Desktop.
utility to assist in identifying stale (no longer active) domain/hostname entries in large dns blocklists.
#!/usr/bin/env python3
# This is free and unencumbered software released into the public domain.
#
# Anyone is free to copy, modify, publish, use, compile, sell, or
# distribute this software, either in source code form or as a compiled
# binary, for any purpose, commercial or non-commercial, and by any
# means.
#
# In jurisdictions that recognize copyright laws, the author or authors
# of this software dedicate any and all copyright interest in the
# software to the public domain. We make this dedication for the benefit
# of the public at large and to the detriment of our heirs and
# successors. We intend this dedication to be an overt act of
# relinquishment in perpetuity of all present and future rights to this
# software under copyright law.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# For more information, please refer to <https://unlicense.org>
# =====================================================================
# ABOUT
# =====================================================================
# this utility was written to support signature curation for the open
# source next-generation firewall "dnxfirewall".
# https://dnxfirewall.com or https://github.com/DOWRIGHTTV/dnxfirewall
# collecting and keeping these large lists up to date in addition to
# research and development is a big undertaking so open-source and
# freely available lists are vital in ensuring broad coverage of domain
# categories as a cross-reference when proxying internet traffic.
# the "block list project" is a great resource and all lists are placed
# in the public domain. https://github.com/blocklistproject/Lists
# to give back, i tried to make this utility more user-friendly by
# adding support for cli arguments and directly usable from the
# cli, which [hopefully] removes the requirement of needing to know the
# python programming language.
# =====================================================================
# DESCRIPTION
# =====================================================================
# this utility can be used to validate domains lists on a large scale.
# its intended use is to be used in conjunction with large domain block
# lists (for use in web filters or similar) and only provides details
# of whether a domain returned valid (A) or (NS) records. the absence
# of the former can indicate that a domain can be safely removed from
# the list as it will not resolve to an ip address and a connection to
# a remote server will not be made.
# notes:
# input files
# should specify the full path unless the file is in the same directory
# as the user when calling the utility.
# output files
# will be written to the current working directory (directory the user
# is in at execution).
# in/out formats
# input parser automatically detects if the line is in host file format
# so the file can contain either format or a combination of both.
# output of the results is written in CSV format.
# use argument "-c" to generate a "clean" file that containing only
# domains with at least 1 valid (A) record. if "-H" is used, then the
# clean output will be in host file format (0.0.0.0 domain.tld).
import os
import socket
import sys
import time
import datetime
import argparse
import threading
import multiprocessing as mp
from collections import deque
from ipaddress import IPv4Address
from socket import gethostbyname
from struct import Struct
# building a unique filename based on the current time
current_time = datetime.datetime.now().strftime('%I-%H-%M.%S%p')
FILENAME_SEED = f'{current_time}-results'
# counters used to communicate status between processes
SHARED_COUNTERS = [mp.Value('i', 0), mp.Value('i', 0), mp.Value('i', 0)]
# time used as a reference for determining how long the utility ran.
START_TIME = time.localtime()
# CloudFlare set to default because they will rarely throttle
DEFAULT_SERVER_1 = IPv4Address('1.1.1.1')
DEFAULT_SERVER_2 = IPv4Address('1.0.0.1')
# direct references and helper functions
byte_join = b''.join
byte_pack = Struct('!B').pack
dns_header_pack = Struct('!6H').pack
dns_header_unpack = Struct('!6H').unpack_from
def create_header(dns_id, arc=0, *, cd):
bit_fields = (1 << 8) | (cd << 4)
return dns_header_pack(dns_id, bit_fields, 1, 0, 0, arc)
def progress(send_count: int, recv_count: int, file_ct: str) -> None:
print(f'{time.strftime("%H:%M:%S", START_TIME)} | {time.strftime("%H:%M:%S")} | ', end='')
print(f'proc[{file_ct}] | recv[{recv_count}] | sent[{send_count}/{PROGRESS_TOTAL_COUNT}] \r', end='')
class ValidationProcessor:
'''Primary worker and individual task manager.
this class is designed to be self-managed with each instance running in a separate subprocess.
only basic status will be reported back up to the main process.
each instance will write it results to a partial file when its job queue is finished processing.
'''
__slots__ = (
'_t_lock', '_send_count', '_recv_count', '_results', '_job_done', '_count_done', '_query_sock',
'ttl_send_count', 'ttl_recv_count', 'notify_done', 'idx', 'file_idx', 't_count', 'jobs'
)
def __init__(self, idx: int, shared_counters: list[mp.Value], server_ip: IPv4Address, jobs: deque):
self.ttl_send_count: mp.Value = shared_counters[0]
self.ttl_recv_count: mp.Value = shared_counters[1]
self.notify_done: mp.Value = shared_counters[2]
self.idx: int = idx
self.file_idx: str = f'{idx}'
self.jobs: deque = jobs
self._t_lock = threading.Lock()
self._send_count = 0
self._recv_count = 0
self._results = deque()
self._job_done = False
self._count_done = False
# create a UDP socket and call its "connect" method.
# for UDP the "connect" method stores the server address, which allows us to use the "send" method
self._query_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._query_sock.settimeout(5)
self._query_sock.connect((f'{server_ip}', 53))
@classmethod
def run(cls, idx: int, shared_counters: mp.Value, server_ip: IPv4Address, jobs: deque):
self = cls(idx, shared_counters, server_ip, jobs)
self._process_loop()
def sender(self):
sock_send = self._query_sock.send
jobs_pop = self.jobs.popleft
# hostname termination byte added in q[2] for simplicity
q_template = [create_header(dns_id=self.idx, cd=1), b'\x00', b'\x00\x00\x01\x00\x01']
while self.jobs:
q_template[1] = jobs_pop()
# try block will ensure the sender loop exits when the responder thread is done or the socket is timed out.
try:
sock_send(byte_join(q_template))
except OSError:
break
self._send_count += 1
time.sleep(.003)
def receiver(self):
sock_recv = self._query_sock.recv
results_append = self._results.append
while True:
try:
results_append(sock_recv(1024))
except OSError:
break
self._recv_count += 1
if (self._recv_count == self._send_count):
break
self._query_sock.close()
def _report_thread(self):
# used to ensure a thread completion count doesn't get reported more than once
last_send_count = self._send_count
last_recv_count = self._recv_count
while True:
new_send_count = self._send_count
if (new_send_count > last_send_count):
with self.ttl_send_count.get_lock():
self.ttl_send_count.value += (new_send_count - last_send_count)
last_send_count = new_send_count
new_recv_count = self._recv_count
if (new_recv_count > last_recv_count):
with self.ttl_recv_count.get_lock():
self.ttl_recv_count.value += (new_recv_count - last_recv_count)
last_recv_count = new_recv_count
# prevents this thread from just spinning when the process is done
if (self._job_done and (new_send_count == last_send_count and new_recv_count == last_recv_count)):
self._count_done = True
break
time.sleep(.1)
def _process_loop(self):
threading.Thread(target=self._report_thread).start()
sender = threading.Thread(target=self.sender)
receiver = threading.Thread(target=self.receiver)
sender.start()
receiver.start()
threads = [sender, receiver]
# waits for all thread to finish, which also means the job queue has been fully processed
for t in threads:
t.join()
# each process will write its results separately
try:
self._write_to_disk()
finally:
# notifying the count reporter thread to wrap up
self._job_done = True
# waiting for the count reporter thread to finish counting (progress data accuracy)
while not self._count_done:
time.sleep(.1)
# counter shared between all processes and used by the main process to know when all jobs are complete.
with self.notify_done.get_lock():
self.notify_done.value += 1
def _write_to_disk(self):
processed = []
processed_append = processed.append
for data in self._results:
dns_header = dns_header_unpack(data)
temp_name, offset = [], 12
while data := data[offset:]:
if not (length := data[0]):
break
temp_name.append(data[1:1 + length])
offset = 1 + length
name = b'.'.join(temp_name).decode('utf-8')
processed_append(f'{name},{dns_header[3]},{dns_header[4]}')
with open(f'{FILENAME_SEED}-{self.file_idx}.txt', 'w') as _signatures:
if (self.idx == 1):
_signatures.write('host,resource,authority\n')
_signatures.write('\n'.join(processed))
if (__name__ == '__main__'):
parser = argparse.ArgumentParser(description='bulk domain name [(A) record] verification.')
parser.add_argument('filename', help='input file with list of hostnames', type=argparse.FileType('r'))
parser.add_argument('-c', help='output separate "clean" file', action='store_true')
parser.add_argument('-H', help='use host file format for output', action='store_true')
parser.add_argument('--proc', metavar='{1 - 4}', help='number of parallel workloads (evenly split)',
type=int, choices=range(1, 5), default=1)
parser.add_argument('--iter-limit', metavar='{1 - 999999}', help='limits total processed names (FIFO)',
type=int, choices=range(1, 1000000), default=999999)
parser.add_argument('--servers', metavar='ip_address', help='List of IP Addresses to use for resolving DNS',
type=IPv4Address, nargs='+', default=[DEFAULT_SERVER_1, DEFAULT_SERVER_2])
args = parser.parse_args(sys.argv[1:])
CLEAN_OUTPUT = args.c
HOST_FILE_FORMAT = args.H
SIGNATURES = args.filename
PROCESS_COUNT: int = args.proc # parallel processing
ITER_COUNT: int = args.iter_limit # clamp amount of names to check
RESOLVERS: list[IPv4Address] = args.servers
# filtering out any comments
_sigs = [x for x in SIGNATURES.read().splitlines() if '#' not in x]
# PROGRESS_TOTAL_COUNT = ITER_COUNT if ITER_COUNT < len(sigs) else len(sigs)
if (len(_sigs) > ITER_COUNT):
_sigs = _sigs[:ITER_COUNT]
sigs = []
sigs_append = sigs.append
# encoding hostname to bytes as a preprocessor - automatically detects if the line is in host file format
for sig in _sigs:
try:
if (' ' in sig):
sig = sig.split()[1]
encoded_sig = byte_join([byte_pack(len(s)) + s.encode('utf-8') for s in sig.split('.')])
except:
print(f'hostname format error->{sig}')
else:
sigs_append(encoded_sig)
# successfully encoded names only
PROGRESS_TOTAL_COUNT = len(sigs)
# total number of valid hostnames divided by the process count specified in cli argument
# adding the remainder will ensure that odd out data doesn't roll into a new chunk.
STEP = PROGRESS_TOTAL_COUNT // PROCESS_COUNT + (PROGRESS_TOTAL_COUNT % PROCESS_COUNT > 0)
process_id, processes = 1, []
for i in range(0, PROGRESS_TOTAL_COUNT, STEP):
dataset = deque(sigs[i:i+STEP])
resolver = RESOLVERS[(process_id-1) % 2]
pr = mp.Process(target=ValidationProcessor.run, args=(process_id, SHARED_COUNTERS, resolver, dataset))
pr.start()
processes.append(pr)
process_id += 1
ttl_send_count = SHARED_COUNTERS[0]
ttl_recv_count = SHARED_COUNTERS[1]
notify_done = SHARED_COUNTERS[2]
try:
while notify_done.value < PROCESS_COUNT:
progress(ttl_send_count.value, ttl_recv_count.value, f'{notify_done.value}/{PROCESS_COUNT}')
time.sleep(.5)
except:
pass
progress(ttl_send_count.value, ttl_recv_count.value, f'{notify_done.value}/{PROCESS_COUNT}')
for p in processes:
p.terminate()
print(f'\ndone. ({PROCESS_COUNT}) processes terminated. merging files...')
# ==================
# MERGING FILES
# ==================
# lazily grabbing all the files that were created by sub processes
result_files = [f for f in sorted(os.listdir()) if FILENAME_SEED in f]
i, merge_failed = 0, []
# creating file without process id to aggregate into
with open(f'{FILENAME_SEED}.csv', 'w') as combined_results:
# copying over each file
for i, file in enumerate(result_files, 1):
# removing the partial from disk if it is successfully merged.
try:
with open(file, 'r') as partial_results:
combined_results.write(partial_results.read())
combined_results.write('\n')
except:
merge_failed.append(file)
else:
os.remove(file)
print(f'merged: {i-len(merge_failed)}, failed: {len(merge_failed)}. see {FILENAME_SEED}.csv for results.')
if (CLEAN_OUTPUT):
print('generating clean output.')
ext = 'host' if HOST_FILE_FORMAT else 'txt'
pre = '0.0.0.0 ' if HOST_FILE_FORMAT else ''
with open(f'{FILENAME_SEED}.csv', 'r') as combined_results:
with open(f'{FILENAME_SEED}.{ext}', 'w') as clean_output:
for line in combined_results:
line = line.split(',')
try:
domain = line[0]
a_r = int(line[1])
ns_r = int(line[2])
except:
continue
if (a_r > 0):
clean_output.write(f'{pre}{domain}\n')
print(f'done. see {FILENAME_SEED}.{ext}.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment