Skip to content

Instantly share code, notes, and snippets.

@s3rgeym
Created April 5, 2024 02:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save s3rgeym/f0eb9aad8c5d0bf1e250f7747120665d to your computer and use it in GitHub Desktop.
Save s3rgeym/f0eb9aad8c5d0bf1e250f7747120665d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# pylint: disable=C,R,W
import argparse
import logging
import multiprocessing as mp
import re
import sys
import threading
from typing import Sequence, TextIO
from urllib.parse import urljoin
import requests
__version__ = "0.1.0"
logger = mp.get_logger()
class ANSI:
CSI = "\x1b["
RESET = f"{CSI}m"
CLEAR_LINE = f"{CSI}2K\r"
BLACK = f"{CSI}30m"
RED = f"{CSI}31m"
GREEN = f"{CSI}32m"
YELLOW = f"{CSI}33m"
BLUE = f"{CSI}34m"
MAGENTA = f"{CSI}35m"
CYAN = f"{CSI}36m"
WHITE = f"{CSI}37m"
GREY = f"{CSI}90m"
BRIGHT_RED = f"{CSI}91m"
BRIGHT_GREEN = f"{CSI}92m"
BRIGHT_YELLOW = f"{CSI}99m"
BRIGHT_BLUE = f"{CSI}94m"
BRIGHT_MAGENTA = f"{CSI}95m"
BRIGHT_CYAN = f"{CSI}96m"
BRIGHT_WHITE = f"{CSI}97m"
class ColorHandler(logging.StreamHandler):
_log_colors: dict[int, str] = {
logging.DEBUG: ANSI.BLUE,
logging.INFO: ANSI.YELLOW,
logging.WARNING: ANSI.MAGENTA,
logging.ERROR: ANSI.RED,
logging.CRITICAL: ANSI.BRIGHT_RED,
}
_fmt = logging.Formatter(
"[%(levelname).1s] %(processName)-16s - %(message)s"
)
def format(self, record: logging.LogRecord) -> str:
message = self._fmt.format(record)
return f"{self._log_colors[record.levelno]}{message}{ANSI.RESET}"
class Worker(mp.Process):
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36"
downloadable_exts = (
# archives
".7z",
".rar",
".tar",
".tar.bz2",
".tar.gz",
".tar.xz",
".tar.zst",
".tgz",
".txz",
".zip",
# dumps
".sql",
".dump",
# other
".bk",
".bak",
)
def __init__(
self,
in_q: mp.JoinableQueue,
out_q: mp.Queue,
seen: dict,
session: requests.Session | None = None,
) -> None:
super().__init__(daemon=True)
self.in_q = in_q
self.out_q = out_q
self.seen = seen
self.session = session or self.default_session()
self.start()
def default_session(self) -> requests.Session:
s = requests.session()
s.headers.update({"User-Agent": self.user_agent})
return s
def extract_links(self, s: str) -> list[str]:
return re.findall('<a href="([^"]+)', s)
def run(self) -> None:
while True:
try:
if (url := self.in_q.get()) is None:
break
if url in self.seen:
logger.debug("already seen: %s", url)
continue
response = self.session.get(url, allow_redirects=False)
if response.status_code != 200:
logger.warning("%d - %s", response.status_code, url)
continue
self.seen[url] = True
html = response.text
if "<title>Index of /" not in html:
logger.warning("directory listing not found: %s", url)
continue
links = self.extract_links(html)
for link in links:
# ?C=N;O=D
if "?" in link:
continue
target_url = urljoin(url, link)
if target_url.lower().endswith(self.downloadable_exts):
self.out_q.put(target_url)
continue
if not target_url.endswith("/"):
continue
self.in_q.put(target_url)
except BaseException as ex:
logger.exception(ex)
finally:
self.in_q.task_done()
def normalize_url(s: str) -> str:
return ["https://", ""]["://" in s] + s
class OutputThread(threading.Thread):
def __init__(self, stream: TextIO, queue: mp.Queue) -> None:
super().__init__()
self.stream = stream
self.queue = queue
def run(self) -> None:
while True:
url = self.queue.get()
if url is None:
break
self.stream.write(f"{url}\n")
self.stream.flush()
class NameSpace(argparse.Namespace):
input: TextIO
output: TextIO
workers_num: int
debug: bool
def main(argv: Sequence[str] | None = None) -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--input", type=argparse.FileType(), default="-")
parser.add_argument(
"-o", "--output", type=argparse.FileType("w+"), default="-"
)
parser.add_argument("-w", "--workers-num", type=int, default=mp.cpu_count())
parser.add_argument("-d", "--debug", action="store_true", default=False)
args: NameSpace = parser.parse_args(args=argv)
logger.addHandler(ColorHandler())
if args.debug:
logger.setLevel(logging.DEBUG)
in_q = mp.JoinableQueue()
out_q = mp.Queue()
seen = mp.Manager().dict()
for site in map(normalize_url, filter(None, args.input)):
for path in [
"/wordpress/",
"/wordpress/wp-content/",
"/backup/",
"/backups/",
"/dump/",
"/dumps/",
]:
in_q.put_nowait(urljoin(site, path))
logger.info("Directory scanning started")
worker_tasks = [
Worker(in_q=in_q, out_q=out_q, seen=seen)
for _ in range(args.workers_num)
]
out_t = OutputThread(queue=out_q, stream=args.output)
out_t.start()
in_q.join()
for _ in range(args.workers_num):
in_q.put(None)
for x in worker_tasks:
x.join()
out_q.put(None)
out_t.join()
logger.info("Finished!")
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment