|
#!/bin/env python3 |
|
|
|
import asyncio |
|
import argparse |
|
import datetime |
|
from enum import Enum |
|
import functools |
|
import itertools |
|
import logging |
|
import os |
|
from pathlib import Path |
|
import re |
|
from typing import (Any, AsyncIterator, Coroutine, Dict, Iterator, List, Optional, |
|
Tuple, Type, TypeVar) |
|
from urllib.parse import urljoin |
|
|
|
import aiohttp |
|
from aiohttp.client_exceptions import ClientConnectorError |
|
from asyncio_pool import AioPool |
|
from bs4 import BeautifulSoup |
|
from bs4.element import Tag |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
cache_dir = Path("cache") |
|
|
|
|
|
@functools.lru_cache |
|
def get_image_client(): |
|
return aiohttp.ClientSession( |
|
timeout=aiohttp.ClientTimeout(sock_connect=10, sock_read=30), |
|
) |
|
|
|
|
|
@functools.lru_cache |
|
def get_page_client(): |
|
return aiohttp.ClientSession( |
|
# time out faster but with more overall retries |
|
timeout=aiohttp.ClientTimeout(sock_connect=5, sock_read=5), |
|
) |
|
|
|
|
|
class MaxRetryError(Exception): |
|
pass |
|
|
|
|
|
def aretry(attempts=15, min_wait=0.2, max_wait=30, wait_factor=1.5): |
|
# reaches 30 after 13 iterations |
|
def decorator(func): |
|
@functools.wraps(func) |
|
async def wrapper(*args, **kwargs): |
|
nonlocal attempts |
|
for i in itertools.count(1): |
|
try: |
|
return await func(*args, **kwargs) |
|
except Exception as e: |
|
if i >= attempts: |
|
raise MaxRetryError("Maximum number of retries reached", e) from None |
|
logger.debug("Retrying after attempt %d failed because of %r", i, e) |
|
if ( |
|
(isinstance(e, ClientConnectorError) |
|
and ("Name or service not known" in str(e.args[1]) |
|
or "No address associated with hostname" in str(e.args[1]) |
|
or "No route to host" in str(e.args[1]) |
|
or "getaddrinfo failed" in str(e.args[1]))) |
|
# or (isinstance(e, SSLError)No route to host |
|
# and "SSLCertVerificationError" in str(e.args)) |
|
): |
|
attempts = 8 |
|
delay = min(min_wait * (wait_factor ** i), max_wait) |
|
await asyncio.sleep(delay) |
|
|
|
return wrapper |
|
|
|
return decorator |
|
|
|
|
|
class PostKind(str, Enum): |
|
REGULAR = 'post_regular' |
|
LINK = 'post_link' |
|
IMAGE = 'post_image' |
|
VIDEO = 'post_video' |
|
QUOTE = 'post_quote' |
|
EVENT = 'post_event' |
|
REVIEW = 'post_review' |
|
FILE = 'post_file' |
|
REACTION = 'post_reaction' |
|
|
|
TUMBLR = 'tumblr' # should be covered by the others |
|
UNKNOWN = 'UNKNOWN' |
|
|
|
|
|
class Post: |
|
def __init__(self, tag: Tag, content: Tag) -> None: |
|
self.tag = tag |
|
self.content = content |
|
|
|
@classmethod |
|
def from_tag(cls, tag: Tag) -> Optional['Post']: |
|
if content := tag.select_one("div.content"): |
|
return Post(tag, content) |
|
else: |
|
logger.error("No content found for post:\n%s", tag) |
|
return None |
|
|
|
@property |
|
def id(self) -> Optional[int]: |
|
if id_ := self.tag.get('id'): |
|
if id_.startswith("post"): |
|
return int(id_[4:]) |
|
elif id_.startswith("multipost"): |
|
return int(id_[9:]) |
|
else: |
|
logger.error("Unknown id format: %s", id_) |
|
else: |
|
logger.error("No id for post") |
|
return None |
|
|
|
@property |
|
def is_repost(self) -> bool: |
|
return 'post_repost' in self.tag['class'] |
|
|
|
@property |
|
def is_friend(self) -> bool: |
|
return 'author-friend' in self.tag['class'] |
|
|
|
@property |
|
def kind(self) -> PostKind: |
|
for c in self.tag['class']: |
|
try: |
|
return PostKind(c) |
|
except ValueError: |
|
pass |
|
return PostKind.UNKNOWN |
|
|
|
@property |
|
def tags(self) -> List[str]: |
|
return [c[4:] for c in self.content['class'] if c.startswith("tag-")] |
|
|
|
@property |
|
def time(self) -> Optional[datetime.datetime]: |
|
if abbr := self.tag.select_one("span.time abbr"): |
|
# Jul 05 2015 00:21:50 UTC |
|
return datetime.datetime.strptime(abbr['title'], "%b %d %Y %H:%M:%S %Z") |
|
return None |
|
|
|
@property |
|
def source(self) -> Optional[str]: |
|
if a := self.content.select_one("div.caption a"): |
|
return a["href"] |
|
else: |
|
return None |
|
|
|
@staticmethod |
|
def strip_res(url): |
|
return re.sub(r"(asset/[0-9a-zA-Z]+/[0-9a-zA-Z]{4}_[0-9a-zA-Z]{4})(?:_[0-9]+)?(?=\.)", |
|
r"\1", url) |
|
|
|
def get_content_urls(self) -> List[str]: |
|
urls: Dict[str, Any] = {} # use a dict to preserve order and deduplicate |
|
if lightboxes := self.content.select("a.lightbox"): |
|
urls.update({self.strip_res(a["href"]): None for a in lightboxes}) |
|
elif img_tags := self.content.select("div.imagecontainer img"): |
|
urls.update({self.strip_res(img['src']): None for img in img_tags}) |
|
elif video_tags := self.content.select("video"): |
|
for video in video_tags: |
|
if src := video.get('src'): |
|
urls[src] = None |
|
elif sources := video.select("source"): |
|
urls.update({src['src']: None for src in sources}) |
|
else: |
|
logger.warning("Video tag without src or source") |
|
return list(urls.keys()) |
|
|
|
def get_file_url(self) -> Optional[str]: |
|
if a := self.content.select_one("h3 a"): |
|
return a['href'] |
|
return None |
|
|
|
def __repr__(self) -> str: |
|
props = ['id', 'is_repost', 'is_friend', 'kind', 'tags', 'time'] |
|
return ("<Post(" |
|
+ ", ".join(f"{p}={getattr(self, p)!r}" for p in props) |
|
+ ")>") |
|
|
|
|
|
Self = TypeVar('Self', bound='Page') |
|
|
|
|
|
class Page: |
|
def __init__(self, url: str, html: str) -> None: |
|
self.url = url |
|
self.html = html |
|
self.bs = BeautifulSoup(html, 'html.parser') |
|
|
|
@classmethod |
|
async def load(cls: Type[Self], url: str) -> Self: |
|
return cls(url, await cls.cache(url)) |
|
|
|
@classmethod |
|
@aretry(attempts=60) |
|
async def cache(cls, url: str) -> str: |
|
cache_path = None |
|
if m := re.search(r"//([^?]+)", url): |
|
page_name = m.group(1).replace("/", "_") |
|
cache_path = cache_dir / f'{page_name}.html' |
|
if cache_path.exists(): |
|
logger.debug("Found %s in cache", url) |
|
with cache_path.open(encoding='utf-8') as f: |
|
return f.read() |
|
|
|
logger.info(f"Fetching {url} …") |
|
async with get_page_client().get(url) as resp: |
|
resp.raise_for_status() |
|
logger.debug("resp: %r", resp) |
|
html = await resp.text() |
|
|
|
if cache_path: |
|
logger.debug("Caching %s at %s", url, cache_path) |
|
cache_path.parent.mkdir(exist_ok=True, parents=True) |
|
with cache_path.open('w', encoding='utf-8') as f: |
|
f.write(html) |
|
|
|
return html |
|
|
|
async def get_next_page(self: Self) -> Optional[Self]: |
|
if next_page_link_tag := self.bs.select_one("a.more.keephash"): |
|
next_url = urljoin(self.url, next_page_link_tag["href"]) |
|
return await self.__class__.load(next_url) |
|
else: |
|
return None |
|
|
|
def __iter__(self) -> Iterator[Post]: |
|
for post_tag in self.bs.select("#first_batch > div.post"): |
|
if post := Post.from_tag(post_tag): |
|
yield post |
|
|
|
|
|
class Soup: |
|
def __init__(self, start_url: str, pool: AioPool) -> None: |
|
self.start_url = start_url |
|
self.pool = pool |
|
|
|
async def __aiter__(self) -> AsyncIterator[Page]: |
|
page: Optional[Page] = await self.pool.exec(Page.load(self.start_url)) |
|
while page: |
|
yield page |
|
page = await self.pool.exec(page.get_next_page()) |
|
|
|
|
|
def set_mtime(post: Post, path: Path) -> None: |
|
if dtime := post.time: |
|
stat = os.stat(path) |
|
os.utime(path, times=(stat.st_atime, dtime.timestamp())) |
|
|
|
|
|
@aretry() |
|
async def download(post: Post, url: str, target_name: Path) -> None: |
|
url = urljoin("https://", url) # ensure a protocol |
|
if m := re.search(r"\.[^.?/]+(?=\?|$)", url): |
|
ext = m.group(0) |
|
else: |
|
logger.warning("Unable to determine extension of %s", url) |
|
ext = '' |
|
target_path = target_name.with_suffix(ext) |
|
if target_path.exists(): |
|
logger.info("Skipping %s: already exists", target_path) |
|
return |
|
|
|
target_path.parent.mkdir(exist_ok=True, parents=True) |
|
partial_path = target_path.with_name(target_path.name + ".part") |
|
|
|
try: |
|
async with get_image_client().get(url) as resp: |
|
if resp.status == 404: |
|
logger.warning("404 Not Found: %s", url) |
|
return |
|
resp.raise_for_status() # or if in {429, 500, 502, 503, 504} |
|
with partial_path.open(mode='wb') as f: |
|
while chunk := await resp.content.read(4 * 2**10): |
|
f.write(chunk) |
|
except MaxRetryError as e: |
|
logger.warning("Failed to download %s; error: %r", url, e.args[1]) |
|
return |
|
except asyncio.CancelledError: |
|
pass |
|
else: |
|
partial_path.rename(target_path) |
|
logger.info("Downloaded %s", target_path) |
|
set_mtime(post, target_path) |
|
return |
|
|
|
partial_path.unlink(missing_ok=True) |
|
|
|
|
|
def write_text(post: Post, target_path: Path) -> None: |
|
target_path = Path(target_path) |
|
if target_path.exists(): |
|
logger.info("Skipping %s: already exists", target_path) |
|
return |
|
|
|
target_path.parent.mkdir(exist_ok=True, parents=True) |
|
with target_path.open('w', encoding='utf-8') as f: |
|
f.write(str(post.content)) |
|
logger.info("Saved %s", target_path) |
|
set_mtime(post, target_path) |
|
|
|
|
|
def handle_post(post: Post, target_dir: Path, skip_source: bool) \ |
|
-> Iterator[Tuple[Coroutine, str]]: |
|
logger.debug("%r", post) |
|
if post.is_friend: |
|
logger.info("Skipping ad post %s", post.id) |
|
return |
|
|
|
tags_str = f'[{" ".join(sorted(post.tags))}]' if post.tags else "" |
|
basename = f"{post.id}{tags_str}" |
|
|
|
if post.kind in {PostKind.IMAGE, PostKind.VIDEO, PostKind.TUMBLR, PostKind.REVIEW}: |
|
if ( |
|
not skip_source |
|
and (source := post.source) |
|
and re.search(r"\.(png|gif|webm|mp4|jpe?g|mkv)(\?|$)", source) |
|
): |
|
yield ( |
|
download(post, source, target_dir / f"images/{basename}_source"), |
|
source, |
|
) |
|
|
|
urls = post.get_content_urls() |
|
if not urls: |
|
if not post.source and post.kind is not PostKind.REVIEW: |
|
logger.error("Could not find content url for %r, falling back to text", post.id) |
|
write_text(post, target_dir / f"posts/{basename}.html") |
|
elif len(urls) == 1: |
|
yield ( |
|
download(post, urls[0], target_dir / f"images/{basename}"), |
|
urls[0], |
|
) |
|
else: |
|
for i, url in enumerate(urls): |
|
yield ( |
|
download(post, url, target_dir / f"images/{basename}_{i}"), |
|
url, |
|
) |
|
|
|
if post.kind in {PostKind.REGULAR, PostKind.LINK, PostKind.QUOTE, PostKind.EVENT, |
|
PostKind.REVIEW}: |
|
write_text(post, target_dir / f"posts/{basename}.html") |
|
|
|
if post.kind in {PostKind.FILE}: |
|
if file_url := post.get_file_url(): |
|
yield ( |
|
download(post, file_url, target_dir / f"files/{basename}.html"), |
|
file_url, |
|
) |
|
else: |
|
logger.warning("Didn't find file in post %r", post) |
|
|
|
if post.kind in {PostKind.REACTION}: |
|
write_text(post, target_dir / f"reactions/{basename}.html") |
|
|
|
if post.kind is PostKind.UNKNOWN: |
|
logger.warning("Unknown post type: %r\nclasses: %r", post, post.tag['class']) |
|
|
|
|
|
async def main(params: argparse.Namespace) -> None: |
|
futures: List[asyncio.Task] = [] |
|
|
|
def on_done(fut: asyncio.Future, url: str): |
|
if not fut.cancelled() and (exc := fut.exception()): |
|
if isinstance(exc, MaxRetryError): |
|
logger.error("Error while downloading %s: %r", url, exc) |
|
else: |
|
logger.exception("Exception while downloading", exc_info=exc) |
|
# aiohttp.client_exceptions.ServerTimeoutError |
|
# aiohttp.client_exceptions.ClientConnectorError |
|
# No address associated with hostname |
|
# else: |
|
# fut.result() # grab result and do nothing with it |
|
|
|
async with AioPool(size=params.parallel) as pool: |
|
i = 0 # `enumerate` isn't so hot with asynchronous iterators |
|
try: |
|
async for page in Soup(params.url, pool): |
|
i += 1 |
|
logger.info(f"### Current page url: {page.url} ({i}) ###") |
|
futures = [] |
|
for post in page: |
|
if post.id < params.until: |
|
logger.info("Reached first post before %d: %d", params.until, post.id) |
|
break |
|
for coro, url in handle_post(post, params.target_dir, params.skip_source): |
|
fut = await pool.spawn(coro) |
|
fut.add_done_callback(functools.partial(on_done, url=url)) |
|
else: |
|
continue |
|
break |
|
else: |
|
logger.info("Reached the end") |
|
|
|
logger.info("Waiting for %d jobs to complete", len(pool)) |
|
await pool.join() |
|
|
|
except asyncio.CancelledError: |
|
if 'page' in locals(): |
|
logger.warning(f"[!] Interrupted on {page.url} ({i}); {pool.n_active} jobs active") |
|
else: |
|
logger.warning("[!] Interrupted during first page load") |
|
|
|
except Exception: |
|
if 'page' in locals(): |
|
logger.exception(f"[!] Aborted on {page.url} ({i})") |
|
else: |
|
logger.exception("[!] Failed getting the first page") |
|
|
|
finally: |
|
await pool.cancel() |
|
await get_image_client().close() |
|
await get_page_client().close() |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("-p", "--parallel", type=int, default=10, metavar="N", |
|
help="Number of parallel downloads.") |
|
parser.add_argument("--skip-source", action='store_true', |
|
help="Don't attempt to download the source URL.") |
|
parser.add_argument("--until", type=int, |
|
help="Stop crawling when going below this ID.") |
|
parser.add_argument("-v", "--verbose", action='store_true', |
|
help="Show debug messages.") |
|
parser.add_argument("-c", "--cache-dir", type=Path, default=None, |
|
help="Folder used to cache the downloaded pages" |
|
" (for faster re-runs or archival) (default: target_dir/cache).") |
|
parser.add_argument("url", help="Link to the soup. Can be to the root or a /since/ URL.") |
|
parser.add_argument("target_dir", nargs='?', type=Path, default=Path(), |
|
help="Target dir to store downloads in (default: working directory).") |
|
params = parser.parse_args() |
|
|
|
cache_dir = params.cache_dir or params.target_dir / "cache" |
|
|
|
handlers: List[Any] |
|
try: |
|
from rich.logging import RichHandler |
|
handlers = [RichHandler()] |
|
except ImportError: |
|
handlers = [logging.StreamHandler()] |
|
|
|
filename = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.log") |
|
file_handler = logging.FileHandler(filename) |
|
file_handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) |
|
handlers.append(file_handler) |
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG if params.verbose else logging.INFO, |
|
format="%(message)s", |
|
datefmt="[%X]", |
|
handlers=handlers, |
|
) |
|
|
|
loop = asyncio.get_event_loop() |
|
if params.verbose: |
|
loop.set_debug(True) |
|
main_task = loop.create_task(main(params), name="main") |
|
try: |
|
loop.run_until_complete(main_task) |
|
except KeyboardInterrupt: |
|
pass |
|
finally: |
|
if main_task.cancel(): |
|
loop.run_until_complete(main_task) |