Last active
March 31, 2023 21:17
-
-
Save firsov/0f8dab16c8251cd54a92485a3bcb09f5 to your computer and use it in GitHub Desktop.
Virtual host enumerator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests, socket, hashlib, argparse | |
from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
from threading import Thread, BoundedSemaphore, Lock | |
from tqdm import tqdm | |
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) | |
# pip install tqdm | |
# Virtual host enumerator that uses a wordlist to generate potential subdomains, | |
# it works like --resolve option in curl | |
# It compares the response headers and body of each request to a reference MD5 hash calculated | |
# from the initial request. If a subdomain's response is unique, meaning it has different response | |
# headers and/or body from the initial request, the script outputs the subdomain and its response | |
# to the console. The script is multithreaded and uses a progress bar to display real-time progress. | |
# Additionally, it supports command line arguments for the wordlist file, target domain, and IP address | |
# to use for target hostname resolution. | |
# Example: python vhost.py -t 100 -w subs100k.txt -i 8.8.8.8 -d google.com -b 502 | |
# It will enum virtual hosts on google.com with an IP address 8.8.8.8 and ignore 502 status code | |
# In curl it would be: curl -k https://test.google.com --resolve 'test.google.com:443:8.8.8.8' | |
# define function to handle each request | |
def make_request(sub, hostname, ip_address, headers, initial_md5, unique, lock, semaphore): | |
oHostname = f"{sub}.{hostname}" | |
override_dns(oHostname, ip_address) | |
params = {'Host': oHostname} | |
try: | |
response = requests.get(f"https://{oHostname}", headers=headers, params=params, verify=False) | |
# check for blacklisted status codes | |
if response.status_code in blacklisted: | |
# release semaphore and return without printing response | |
semaphore.release() | |
return | |
# remove specified headers from response headers | |
headers = response.headers.copy() | |
for header in headers_to_remove: | |
headers.pop(header, None) | |
# calculate md5 hash of headers without specified headers and response body | |
body = response.content | |
md5 = hashlib.md5(str(headers).encode('utf-8') + body).hexdigest() | |
# compare to initial md5 hash | |
if md5 != initial_md5 and md5 not in unique: | |
with lock: | |
print('====== Unique: ' + oHostname) | |
unique.append(md5) | |
for header, value in response.headers.items(): | |
print(f"{header}: {value}") | |
print(body) # .text | |
except requests.exceptions.SSLError as e: | |
if "SSLError(1, '[SSL: SSLV3_ALERT_HANDSHAKE_FAILURE]" in str(e): | |
pass | |
else: | |
semaphore.release() | |
raise e | |
# release semaphore | |
semaphore.release() | |
# parse command line arguments for number of threads, wordlist, hostname, and IP address | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-t', '--threads', type=int, default=10, help='Number of threads to run') | |
parser.add_argument('-w', '--wordlist', required=True, help='Wordlist file to use') | |
parser.add_argument('-d', '--hostname', required=True, help='domain to enum') | |
parser.add_argument('-i', '--ip', required=True, help='IP address to use for target hostname') | |
parser.add_argument('-b', '--blacklisted', type=str, default='', help='Comma-separated list of blacklisted status codes') | |
args = parser.parse_args() | |
if not (args.wordlist and args.ip): | |
parser.error("Please provide required arguments") | |
blacklisted = [] | |
if args.blacklisted: | |
blacklisted = [int(x.strip()) for x in args.blacklisted.split(",")] | |
semaphore = BoundedSemaphore(args.threads) | |
dns_cache = {} | |
def override_dns(domain, ip): | |
domain = domain.lower() | |
dns_cache[domain] = ip | |
prv_getaddrinfo = socket.getaddrinfo | |
def new_getaddrinfo(*args): | |
if args[0] in dns_cache: | |
return prv_getaddrinfo(dns_cache[args[0]], *args[1:]) | |
else: | |
return prv_getaddrinfo(*args) | |
socket.getaddrinfo = new_getaddrinfo | |
hostname = args.hostname | |
ip_address = args.ip | |
initial_md5 = '' | |
headers_to_remove = ['Date', 'Connection', 'X-Amz-Cf-Id', 'ETag', 'Via', 'Last-Modified','Age'] | |
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:88.0) Gecko/20100101 Firefox/88.0'} | |
override_dns(hostname, ip_address) | |
try: | |
response = requests.get('https://' + hostname, headers=headers, params={}, verify=False) | |
initial_headers = response.headers.copy() | |
for header in headers_to_remove: | |
initial_headers.pop(header, None) | |
initial_body = response.content | |
initial_md5 = hashlib.md5(str(initial_headers).encode('utf-8') + initial_body).hexdigest() | |
print(initial_body) | |
except requests.exceptions.SSLError as e: | |
if "SSLError(1, '[SSL: SSLV3_ALERT_HANDSHAKE_FAILURE]" in str(e): | |
pass | |
else: | |
raise e | |
with open(args.wordlist, 'r') as f: | |
subs = f.read().splitlines() | |
unique = [] | |
lock = Lock() | |
# loop through subdomains and make requests with threads | |
for sub in tqdm(subs, desc='Checking virtual hosts'): | |
semaphore.acquire() | |
thread = Thread(target=make_request, args=(sub, hostname, ip_address, headers, initial_md5, unique, lock, semaphore)) | |
thread.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment