Last active
February 24, 2021 22:29
-
-
Save ewen-lbh/066db10f6017d7a556dfe4ecb3bc2e3c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
import re | |
import sys | |
from time import sleep | |
from rich import inspect, print, traceback | |
from bs4 import BeautifulSoup | |
from typing import Callable, NamedTuple, Optional | |
import requests | |
from urllib3.util.url import Url, parse_url, LocationParseError | |
from urllib.parse import urljoin | |
from pathlib import Path | |
from typing import Union | |
traceback.install() | |
SUPERSECRET_DOMAIN = input("Enter the super secret domain avec les polys de maths: ") | |
class WithinWhat(Enum): | |
internet = 0 | |
domain = 1 | |
path = 2 # FIXME | |
def _get_absolute_href(parent: Url, child: str) -> Url: | |
""" | |
C'est dégueleulasse mon dieux. | |
""" | |
child_raw = child | |
parent_raw = parent.url | |
child = parse_url(child) | |
if child_raw.startswith("../") or child_raw.startswith("./"): | |
return parse_url(urljoin(parent_raw, child_raw)) | |
if "://" not in child_raw: | |
if not parent_raw.endswith("/"): | |
parent_raw = "/".join(parent_raw.split("/")[:-1]) | |
child = parent | |
return parse_url(parent.url + "/" + child_raw) | |
if child.scheme: | |
return child | |
if child.host and not child.scheme: | |
return Url( | |
scheme=parent.scheme, | |
host=parent.host, | |
path=((parent.path or "") + "/" + child.host + "/" + (child.path or "")) | |
.strip("/") | |
.replace("//", "/"), | |
query=child.query, | |
) | |
if child.host == parent.host and not child.scheme: | |
return Url( | |
scheme=(child.scheme or parent.scheme), | |
host=child.host, | |
path=((parent.path or "") + "/" + (child.path or "")) | |
.strip("/") | |
.replace("//", "/"), | |
query=child.query, | |
) | |
return urljoin(parent.url, child.url) | |
def _get_links(entrypoint: str, content: str, within: WithinWhat) -> list[str]: | |
""" | |
Get all hrefs from <a> tags in the given page that stay within `within` | |
- entrypoint: The page's URL. Used to resolve `within` preferences | |
- content: The page's content. | |
""" | |
root = BeautifulSoup(content, features="html.parser") | |
entrypoint = parse_url(entrypoint) | |
for link in root("a"): | |
if not link.attrs.get("href"): | |
continue | |
try: | |
href = _get_absolute_href(entrypoint, link.attrs["href"]) | |
except LocationParseError: | |
continue | |
if href is None: | |
continue | |
is_within = { | |
WithinWhat.domain: href.host == entrypoint.host, | |
WithinWhat.path: Path(href.path).parent == Path(entrypoint.path).parent, | |
WithinWhat.internet: True, | |
}[within] | |
if is_within: | |
yield href.url | |
def crawl( | |
entrypoint: str, | |
do: Optional[Callable[[requests.Response, Url], Union[None, str]]] = None, | |
within: WithinWhat = WithinWhat.domain, | |
politeness: int = 0, | |
pattern: "re.Pattern" = None, | |
avoid: "re.Pattern" = None, | |
only: "re.Pattern" = None, | |
max_depth: Optional[int] = None, | |
_depth=0, | |
_seen=None, | |
) -> set[str]: | |
""" | |
Return a set of all links mentionned in all pages linking from `entrypoint`, recursively. | |
- do: Run a function on each URL that will be returned. The advantage is that you have access to the `requests.Response` object, no need to hit the network again. | |
- within: Restrict crawling to URLs within a certain range: | |
- `WithinWhat.internet`: No restrictions | |
- `WithinWhat.path`: Only crawl children paths of `entrypoint` | |
- `WithinWhat.domain`: Only crawl URLs that have the same host (or "domain") as `entrypoint` | |
- politeness: How much seconds to wait between each request | |
- pattern: Return only links matching `pattern` | |
- avoid: Avoid crawling links from pages matching `avoid` | |
- only: Only crawl links from pages matching `only` | |
- verbose: Output verbose logging about the URLs being: | |
- crawled (i.e. scanned for more links) | |
- stashed (i.e. added to the set of returned URLs) | |
- max_depth: Do not recurse deeper than this. Note that Python's own recursion limit will supersede this one. | |
_depth & _seen are internal attributes used when calling the function recursively | |
""" | |
seen = _seen or set() | |
p = lambda text: print(" " * _depth + text) | |
try: | |
parsed_entrypoint = parse_url(entrypoint) | |
except LocationParseError: | |
if _depth > 0: | |
return seen | |
else: | |
raise ValueError(f"Couldn't parse URL {entrypoint!r}") | |
if (avoid and avoid.search(entrypoint)) or (only and not only.search(entrypoint)): | |
p(f"[bold yellow]←[/] Avoiding {entrypoint}") | |
return seen | |
if _depth > 0 and politeness: | |
p(f"[bold dim]…[/] Sleeping for {politeness} s") | |
sleep(politeness) | |
resp = requests.get(entrypoint) | |
if resp.status_code >= 400: | |
p(f"[bold red]⚠[/] Error {resp.status_code} while requesting {entrypoint}") | |
return seen | |
if ( | |
pattern | |
and not pattern.search(entrypoint) | |
and "html" not in resp.headers.get("Content-Type") | |
): | |
p(f"[bold dim]…[/] [dim]Ignoring {entrypoint}") | |
return seen | |
if not pattern or pattern.search(entrypoint): | |
if not do: | |
p(f"[bold cyan]→[/] Stashing {entrypoint}") | |
seen.add(entrypoint) | |
if do: | |
p( | |
f"[bold green]>[/] Executing [green]{do.__qualname__}[/green] on {entrypoint}" | |
) | |
msg = do(resp, parsed_entrypoint) | |
for line in msg.splitlines(): | |
p("[dim]│[/] " + line) | |
return seen | |
does_not_exceed_max_depth = _depth <= (max_depth or sys.getrecursionlimit()) | |
if "html" in resp.headers.get("Content-Type") and does_not_exceed_max_depth: | |
p(f"[bold magenta]↘️[/] Crawling {entrypoint}") | |
for url in _get_links(entrypoint, resp.text, within=within): | |
if url in seen: | |
continue | |
seen |= crawl( | |
entrypoint=url, | |
do=do, | |
avoid=avoid, | |
only=only, | |
within=within, | |
pattern=pattern, | |
politeness=politeness, | |
_depth=_depth + 1, | |
_seen=seen | {url}, | |
) | |
# if seen == _seen: | |
# p("[bold dim]…[/][dim] Found nothing of interest") | |
return seen | |
def save(resp: requests.Response, url: Url) -> str: | |
msg = "" | |
path = Path("./archive/" + url.path) | |
if path.exists() and path.suffix != ".pdf": | |
return f"[bold dim]…[/] Skipping as file already exists" | |
path.parent.mkdir(parents=True, exist_ok=True) | |
if path.suffix in (".tex", ".ml", ".py"): | |
path.write_text(resp.text) | |
else: | |
path.write_bytes(resp.content) | |
msg += f"[bold blue]↓[/] Saved as [cyan]{path}[/]\n" | |
if path.suffix == ".pdf": | |
tex_source = requests.get(url.url.removesuffix(".pdf") + ".tex") | |
if tex_source.status_code < 400: | |
msg += f"[bold red]♥[/] Found a LaTeX source\n" | |
path.with_suffix(".tex").write_text(tex_source.text) | |
# downloading all \inputs | |
for dependency in re.finditer(r"\\input\{(.+)\}", tex_source.text): | |
extracted_path = dependency.group(1).replace("Entetes", "entetes") | |
dep_save_path = path.parent / Path(extracted_path) | |
dep_url = f"http://{SUPERSECRET_DOMAIN}/" + str( | |
(Path(url.path).parent / Path(extracted_path)).resolve().absolute() | |
) | |
if ( | |
(dep_source := requests.get(dep_url)).status_code < 400 | |
): | |
dep_save_path_display = dep_save_path.resolve().relative_to(Path('.').resolve()) | |
if not dep_save_path.exists(): | |
dep_save_path.parent.mkdir(parents=True, exist_ok=True) | |
dep_save_path.write_text(dep_source.text) | |
msg += f"[bold red]@[/] Saved a dependency as [cyan]{dep_save_path_display}[/]\n" | |
else: | |
msg += f"[bold red]@[/] Found a dependency as [cyan]{dep_save_path_display}[/]\n" | |
return msg.strip() | |
# TODO: parse this: | |
# <html> | |
# <head> | |
# <title>Page de la MPSI du [DATA EXPUNGED]</title> | |
# <meta content="0; URL=./maths/index.html" http-equiv="refresh"/> | |
# </head> | |
# <body> | |
# <link href="./maths/images/icones/fav1.png" rel="shortcut icon" type="image/x-icon"/> | |
# <!-- <a href=./pyzo_distro-2013c.win64.exe>ici</a> --> | |
# </body> | |
# </html> | |
# as a redirect to http://{SUPERSECRET_DOMAIN}/maths/index.html | |
try: | |
crawl( | |
f"http://{SUPERSECRET_DOMAIN}/maths/polys/index.html", | |
do=save, | |
pattern=re.compile(r"\.(pdf|py|ml|tex|png|jpg)$"), | |
only=re.compile(r"(polys|exercices)/"), | |
avoid=re.compile(r"programmes/"), | |
politeness=3, | |
) | |
except KeyboardInterrupt: | |
print("\n\n[bold red]×[/] [red]Cancelled") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment