Skip to content

Instantly share code, notes, and snippets.

@inscapist
Created November 10, 2020 03:42
Show Gist options
  • Save inscapist/b235bdc39c824d289dea7ba7e023d133 to your computer and use it in GitHub Desktop.
Save inscapist/b235bdc39c824d289dea7ba7e023d133 to your computer and use it in GitHub Desktop.
Generic URLFinder with asyncio and semaphore. Python 3.8+ only
Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.3) Gecko/20090913 Firefox/3.5.3,
Mozilla/5.0 (Windows; U; Windows NT 6.1; en; rv:1.9.1.3) Gecko/20090824 Firefox/3.5.3 (.NET CLR 3.5.30729),
Mozilla/5.0 (Windows; U; Windows NT 5.2; en-US; rv:1.9.1.3) Gecko/20090824 Firefox/3.5.3 (.NET CLR 3.5.30729),
Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.1) Gecko/20090718 Firefox/3.5.1,
Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/532.1 (KHTML, like Gecko) Chrome/4.0.219.6 Safari/532.1,
Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; InfoPath.2),
Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.2; Win64; x64; Trident/4.0),
Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; SV1; .NET CLR 2.0.50727; InfoPath.2),
Mozilla/5.0 (Windows; U; MSIE 7.0; Windows NT 6.0; en-US),
Mozilla/4.0 (compatible; MSIE 6.1; Windows XP),
Opera/9.80 (Windows NT 5.2; U; ru) Presto/2.5.22 Version/10.51
import asyncio
import random
from typing import Callable, Generator, Optional
import aiohttp
from loguru import logger
AGENT_FILE = "agents.ini"
async def fetch(
session: aiohttp.ClientSession, url: str, handler: Callable
) -> Optional[str]:
try:
async with session.get(url, timeout=10) as response:
return await handler(url, response)
except aiohttp.ClientResponseError as e:
logger.warning(e.code)
except asyncio.TimeoutError:
logger.debug(f"{url} timeout")
except Exception as e:
logger.warning(e)
async def url_finder(urls: Generator, handler: Callable, max_tasks: int = 20):
tasks = []
sem = asyncio.Semaphore(max_tasks)
async with aiohttp.ClientSession(headers={"User-Agent": random_agent()}) as session: # type: ignore
for url in urls:
task = asyncio.create_task(fetch(session, url, handler))
task.add_done_callback(lambda _: sem.release())
tasks.append(task)
await sem.acquire()
responses = await asyncio.gather(*tasks)
return [r for r in responses if r is not None]
def random_agent() -> str:
return random.choice([line.strip() for line in list(open(AGENT_FILE))])
@inscapist
Copy link
Author

inscapist commented Nov 10, 2020

Sample use case:
Admin panel finder

import asyncio
from typing import Optional

import aiohttp
from app.util.matcher import regex
from app.util.url_finder import url_finder


class AdminUrlGenerator:
    ADMIN_PATTERNS_FILE = "data/admin_patterns.txt"
    ADMIN_PORTS = [
        "2082",
        "2083",
        "2086",
        "2087",
        "2095",
        "2096",
        "8880",
        "8443",
        "9001",
    ]  # https://www.hostgator.com/help/article/commonly-used-port-numbers

    def __init__(self, url: str):
        if url.endswith("/"):
            url = url[:-1]
        self.url = url
        self.ports = self.ADMIN_PORTS
        self.paths = (line.strip() for line in open(self.ADMIN_PATTERNS_FILE, "r"))

    def generate(self):
        yield from self.with_paths()
        yield from self.with_ports()

    def with_paths(self):
        for p in self.paths:
            yield self.url + "/" + p

    def with_ports(self):
        for p in self.ports:
            yield self.url + ":" + p


async def is_admin(url: str, response: aiohttp.ClientResponse) -> Optional[str]:
    if response.status >= 200 and response.status < 400:
        bytecontent = await response.read()
        content = bytecontent.decode("utf-8")
        matched_fields = regex(content, "username|password|login")
        matched_form = regex(content, "</form>")
        if matched_fields and matched_form:
            return url


def main() -> None:
    url = "https://EXAMPLE.com"
    urls = AdminUrlGenerator(url).generate()
    results = asyncio.run(url_finder(urls, is_admin))
    print(results)


if __name__ == "__main__":
    main()

@inscapist
Copy link
Author

pip install aiohttp asyncio

@inscapist
Copy link
Author

inscapist commented Nov 10, 2020

asyncio.run is like await, and it creates an event loop. Previously create_event_loop().run_till_completion().

asyncio.create_task is non-blocking. Comparable to ensure_future().
https://stackoverflow.com/questions/36342899/asyncio-ensure-future-vs-baseeventloop-create-task-vs-simple-coroutine

I keep coming back to this:
https://stackoverflow.com/questions/34753401/difference-between-coroutine-and-future-task-in-python-3-5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment