Skip to content

Instantly share code, notes, and snippets.

@FichteFoll
Last active July 20, 2020 21:12
Show Gist options
  • Save FichteFoll/a6c8cb144209f2834145793e3ff993f7 to your computer and use it in GitHub Desktop.
Save FichteFoll/a6c8cb144209f2834145793e3ff993f7 to your computer and use it in GitHub Desktop.
Soup Archiver - Small script to archive a page from soup.io
images/
posts/
reactions/
files/
*.html
*.log

soup archiver

Just a quick script to archive an entire soup.io profile page. I tried using a couple existing scripts, but none of them fit my feature criteria, so I built my own.

Features

  • Handles all post kinds (hopefully).

    • Downloads all images and videos in a post.
    • Stores html for text-based post kinds (normal, quote, review, etc.).
    • Downloads file posts.
  • Parallel downloads.

  • Uses the post's ID as the target filename and appends tags.

  • Additionally downloads the source url if it is an image (naïve implementation, checking URL extension).

  • Verbose-ish logging that allows finding posts that couldn't be downloaded.

    • Creates a log file automatically.
  • Semi-smart retry algorithm that fails fast for likely unavailable websites (such as those without a dns entry).

  • Can be re-run on any page and informs you which page it was interrupted on.

    • Performs caching of the soup pages to make re-runs faster (in tempdir).
    • You can archive this if you want to do other metadata extraction later.
  • Sets mtime of the created files to timestamp of the post.

Screenshot

screenshot

Usage

$ poetry install
…
$ poetry run python soup_archiver.py --help
usage: soup_archiver.py [-h] [-p PARALLEL] [--skip-source] [--until UNTIL] [-v] [-c CACHE_DIR] url [target_dir]

positional arguments:
  url                   Link to the soup. Can be to the root or a /since/ URL.
  target_dir            Target dir to store downloads in (default: working directory).

optional arguments:
  -h, --help            show this help message and exit
  -p N, --parallel N    Number of parallel downloads.
  --skip-source         Don't attempt to download the source URL.
  --until UNTIL         Stop crawling when going below this ID.
  -v, --verbose         Show debug messages.
  -c CACHE_DIR, --cache-dir CACHE_DIR
                        Folder used to cache the downloaded pages (for faster re-runs or archival) (default: target_dir/cache).

Requirements

  • Python 3.8
  • Python poetry

Dependencies

  • aiohttp
  • asyncio-pool
  • beautifulsoup4
  • rich (optional)
[tool.poetry]
name = "soup_archiver"
version = "0.1.0"
description = ""
authors = ["FichteFoll <fichtefoll2@googlemail.com>"]
[tool.poetry.dependencies]
python = "^3.8"
aiohttp = "^3.6.2"
asyncio-pool = "^0.4.1"
beautifulsoup4 = "^4.9.1"
rich = "^3.3.2"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry>=0.12"]
build-backend = "poetry.masonry.api"
#!/bin/env python3
import asyncio
import argparse
import datetime
from enum import Enum
import functools
import itertools
import logging
import os
from pathlib import Path
import re
from typing import (Any, AsyncIterator, Coroutine, Dict, Iterator, List, Optional,
Tuple, Type, TypeVar)
from urllib.parse import urljoin
import aiohttp
from aiohttp.client_exceptions import ClientConnectorError
from asyncio_pool import AioPool
from bs4 import BeautifulSoup
from bs4.element import Tag
logger = logging.getLogger(__name__)
cache_dir = Path("cache")
@functools.lru_cache
def get_image_client():
return aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(sock_connect=10, sock_read=30),
)
@functools.lru_cache
def get_page_client():
return aiohttp.ClientSession(
# time out faster but with more overall retries
timeout=aiohttp.ClientTimeout(sock_connect=5, sock_read=5),
)
class MaxRetryError(Exception):
pass
def aretry(attempts=15, min_wait=0.2, max_wait=30, wait_factor=1.5):
# reaches 30 after 13 iterations
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
nonlocal attempts
for i in itertools.count(1):
try:
return await func(*args, **kwargs)
except Exception as e:
if i >= attempts:
raise MaxRetryError("Maximum number of retries reached", e) from None
logger.debug("Retrying after attempt %d failed because of %r", i, e)
if (
(isinstance(e, ClientConnectorError)
and ("Name or service not known" in str(e.args[1])
or "No address associated with hostname" in str(e.args[1])
or "No route to host" in str(e.args[1])
or "getaddrinfo failed" in str(e.args[1])))
# or (isinstance(e, SSLError)No route to host
# and "SSLCertVerificationError" in str(e.args))
):
attempts = 8
delay = min(min_wait * (wait_factor ** i), max_wait)
await asyncio.sleep(delay)
return wrapper
return decorator
class PostKind(str, Enum):
REGULAR = 'post_regular'
LINK = 'post_link'
IMAGE = 'post_image'
VIDEO = 'post_video'
QUOTE = 'post_quote'
EVENT = 'post_event'
REVIEW = 'post_review'
FILE = 'post_file'
REACTION = 'post_reaction'
TUMBLR = 'tumblr' # should be covered by the others
UNKNOWN = 'UNKNOWN'
class Post:
def __init__(self, tag: Tag, content: Tag) -> None:
self.tag = tag
self.content = content
@classmethod
def from_tag(cls, tag: Tag) -> Optional['Post']:
if content := tag.select_one("div.content"):
return Post(tag, content)
else:
logger.error("No content found for post:\n%s", tag)
return None
@property
def id(self) -> Optional[int]:
if id_ := self.tag.get('id'):
if id_.startswith("post"):
return int(id_[4:])
elif id_.startswith("multipost"):
return int(id_[9:])
else:
logger.error("Unknown id format: %s", id_)
else:
logger.error("No id for post")
return None
@property
def is_repost(self) -> bool:
return 'post_repost' in self.tag['class']
@property
def is_friend(self) -> bool:
return 'author-friend' in self.tag['class']
@property
def kind(self) -> PostKind:
for c in self.tag['class']:
try:
return PostKind(c)
except ValueError:
pass
return PostKind.UNKNOWN
@property
def tags(self) -> List[str]:
return [c[4:] for c in self.content['class'] if c.startswith("tag-")]
@property
def time(self) -> Optional[datetime.datetime]:
if abbr := self.tag.select_one("span.time abbr"):
# Jul 05 2015 00:21:50 UTC
return datetime.datetime.strptime(abbr['title'], "%b %d %Y %H:%M:%S %Z")
return None
@property
def source(self) -> Optional[str]:
if a := self.content.select_one("div.caption a"):
return a["href"]
else:
return None
@staticmethod
def strip_res(url):
return re.sub(r"(asset/[0-9a-zA-Z]+/[0-9a-zA-Z]{4}_[0-9a-zA-Z]{4})(?:_[0-9]+)?(?=\.)",
r"\1", url)
def get_content_urls(self) -> List[str]:
urls: Dict[str, Any] = {} # use a dict to preserve order and deduplicate
if lightboxes := self.content.select("a.lightbox"):
urls.update({self.strip_res(a["href"]): None for a in lightboxes})
elif img_tags := self.content.select("div.imagecontainer img"):
urls.update({self.strip_res(img['src']): None for img in img_tags})
elif video_tags := self.content.select("video"):
for video in video_tags:
if src := video.get('src'):
urls[src] = None
elif sources := video.select("source"):
urls.update({src['src']: None for src in sources})
else:
logger.warning("Video tag without src or source")
return list(urls.keys())
def get_file_url(self) -> Optional[str]:
if a := self.content.select_one("h3 a"):
return a['href']
return None
def __repr__(self) -> str:
props = ['id', 'is_repost', 'is_friend', 'kind', 'tags', 'time']
return ("<Post("
+ ", ".join(f"{p}={getattr(self, p)!r}" for p in props)
+ ")>")
Self = TypeVar('Self', bound='Page')
class Page:
def __init__(self, url: str, html: str) -> None:
self.url = url
self.html = html
self.bs = BeautifulSoup(html, 'html.parser')
@classmethod
async def load(cls: Type[Self], url: str) -> Self:
return cls(url, await cls.cache(url))
@classmethod
@aretry(attempts=60)
async def cache(cls, url: str) -> str:
cache_path = None
if m := re.search(r"//([^?]+)", url):
page_name = m.group(1).replace("/", "_")
cache_path = cache_dir / f'{page_name}.html'
if cache_path.exists():
logger.debug("Found %s in cache", url)
with cache_path.open(encoding='utf-8') as f:
return f.read()
logger.info(f"Fetching {url} …")
async with get_page_client().get(url) as resp:
resp.raise_for_status()
logger.debug("resp: %r", resp)
html = await resp.text()
if cache_path:
logger.debug("Caching %s at %s", url, cache_path)
cache_path.parent.mkdir(exist_ok=True, parents=True)
with cache_path.open('w', encoding='utf-8') as f:
f.write(html)
return html
async def get_next_page(self: Self) -> Optional[Self]:
if next_page_link_tag := self.bs.select_one("a.more.keephash"):
next_url = urljoin(self.url, next_page_link_tag["href"])
return await self.__class__.load(next_url)
else:
return None
def __iter__(self) -> Iterator[Post]:
for post_tag in self.bs.select("#first_batch > div.post"):
if post := Post.from_tag(post_tag):
yield post
class Soup:
def __init__(self, start_url: str, pool: AioPool) -> None:
self.start_url = start_url
self.pool = pool
async def __aiter__(self) -> AsyncIterator[Page]:
page: Optional[Page] = await self.pool.exec(Page.load(self.start_url))
while page:
yield page
page = await self.pool.exec(page.get_next_page())
def set_mtime(post: Post, path: Path) -> None:
if dtime := post.time:
stat = os.stat(path)
os.utime(path, times=(stat.st_atime, dtime.timestamp()))
@aretry()
async def download(post: Post, url: str, target_name: Path) -> None:
url = urljoin("https://", url) # ensure a protocol
if m := re.search(r"\.[^.?/]+(?=\?|$)", url):
ext = m.group(0)
else:
logger.warning("Unable to determine extension of %s", url)
ext = ''
target_path = target_name.with_suffix(ext)
if target_path.exists():
logger.info("Skipping %s: already exists", target_path)
return
target_path.parent.mkdir(exist_ok=True, parents=True)
partial_path = target_path.with_name(target_path.name + ".part")
try:
async with get_image_client().get(url) as resp:
if resp.status == 404:
logger.warning("404 Not Found: %s", url)
return
resp.raise_for_status() # or if in {429, 500, 502, 503, 504}
with partial_path.open(mode='wb') as f:
while chunk := await resp.content.read(4 * 2**10):
f.write(chunk)
except MaxRetryError as e:
logger.warning("Failed to download %s; error: %r", url, e.args[1])
return
except asyncio.CancelledError:
pass
else:
partial_path.rename(target_path)
logger.info("Downloaded %s", target_path)
set_mtime(post, target_path)
return
partial_path.unlink(missing_ok=True)
def write_text(post: Post, target_path: Path) -> None:
target_path = Path(target_path)
if target_path.exists():
logger.info("Skipping %s: already exists", target_path)
return
target_path.parent.mkdir(exist_ok=True, parents=True)
with target_path.open('w', encoding='utf-8') as f:
f.write(str(post.content))
logger.info("Saved %s", target_path)
set_mtime(post, target_path)
def handle_post(post: Post, target_dir: Path, skip_source: bool) \
-> Iterator[Tuple[Coroutine, str]]:
logger.debug("%r", post)
if post.is_friend:
logger.info("Skipping ad post %s", post.id)
return
tags_str = f'[{" ".join(sorted(post.tags))}]' if post.tags else ""
basename = f"{post.id}{tags_str}"
if post.kind in {PostKind.IMAGE, PostKind.VIDEO, PostKind.TUMBLR, PostKind.REVIEW}:
if (
not skip_source
and (source := post.source)
and re.search(r"\.(png|gif|webm|mp4|jpe?g|mkv)(\?|$)", source)
):
yield (
download(post, source, target_dir / f"images/{basename}_source"),
source,
)
urls = post.get_content_urls()
if not urls:
if not post.source and post.kind is not PostKind.REVIEW:
logger.error("Could not find content url for %r, falling back to text", post.id)
write_text(post, target_dir / f"posts/{basename}.html")
elif len(urls) == 1:
yield (
download(post, urls[0], target_dir / f"images/{basename}"),
urls[0],
)
else:
for i, url in enumerate(urls):
yield (
download(post, url, target_dir / f"images/{basename}_{i}"),
url,
)
if post.kind in {PostKind.REGULAR, PostKind.LINK, PostKind.QUOTE, PostKind.EVENT,
PostKind.REVIEW}:
write_text(post, target_dir / f"posts/{basename}.html")
if post.kind in {PostKind.FILE}:
if file_url := post.get_file_url():
yield (
download(post, file_url, target_dir / f"files/{basename}.html"),
file_url,
)
else:
logger.warning("Didn't find file in post %r", post)
if post.kind in {PostKind.REACTION}:
write_text(post, target_dir / f"reactions/{basename}.html")
if post.kind is PostKind.UNKNOWN:
logger.warning("Unknown post type: %r\nclasses: %r", post, post.tag['class'])
async def main(params: argparse.Namespace) -> None:
futures: List[asyncio.Task] = []
def on_done(fut: asyncio.Future, url: str):
if not fut.cancelled() and (exc := fut.exception()):
if isinstance(exc, MaxRetryError):
logger.error("Error while downloading %s: %r", url, exc)
else:
logger.exception("Exception while downloading", exc_info=exc)
# aiohttp.client_exceptions.ServerTimeoutError
# aiohttp.client_exceptions.ClientConnectorError
# No address associated with hostname
# else:
# fut.result() # grab result and do nothing with it
async with AioPool(size=params.parallel) as pool:
i = 0 # `enumerate` isn't so hot with asynchronous iterators
try:
async for page in Soup(params.url, pool):
i += 1
logger.info(f"### Current page url: {page.url} ({i}) ###")
futures = []
for post in page:
if post.id < params.until:
logger.info("Reached first post before %d: %d", params.until, post.id)
break
for coro, url in handle_post(post, params.target_dir, params.skip_source):
fut = await pool.spawn(coro)
fut.add_done_callback(functools.partial(on_done, url=url))
else:
continue
break
else:
logger.info("Reached the end")
logger.info("Waiting for %d jobs to complete", len(pool))
await pool.join()
except asyncio.CancelledError:
if 'page' in locals():
logger.warning(f"[!] Interrupted on {page.url} ({i}); {pool.n_active} jobs active")
else:
logger.warning("[!] Interrupted during first page load")
except Exception:
if 'page' in locals():
logger.exception(f"[!] Aborted on {page.url} ({i})")
else:
logger.exception("[!] Failed getting the first page")
finally:
await pool.cancel()
await get_image_client().close()
await get_page_client().close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--parallel", type=int, default=10, metavar="N",
help="Number of parallel downloads.")
parser.add_argument("--skip-source", action='store_true',
help="Don't attempt to download the source URL.")
parser.add_argument("--until", type=int,
help="Stop crawling when going below this ID.")
parser.add_argument("-v", "--verbose", action='store_true',
help="Show debug messages.")
parser.add_argument("-c", "--cache-dir", type=Path, default=None,
help="Folder used to cache the downloaded pages"
" (for faster re-runs or archival) (default: target_dir/cache).")
parser.add_argument("url", help="Link to the soup. Can be to the root or a /since/ URL.")
parser.add_argument("target_dir", nargs='?', type=Path, default=Path(),
help="Target dir to store downloads in (default: working directory).")
params = parser.parse_args()
cache_dir = params.cache_dir or params.target_dir / "cache"
handlers: List[Any]
try:
from rich.logging import RichHandler
handlers = [RichHandler()]
except ImportError:
handlers = [logging.StreamHandler()]
filename = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.log")
file_handler = logging.FileHandler(filename)
file_handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
handlers.append(file_handler)
logging.basicConfig(
level=logging.DEBUG if params.verbose else logging.INFO,
format="%(message)s",
datefmt="[%X]",
handlers=handlers,
)
loop = asyncio.get_event_loop()
if params.verbose:
loop.set_debug(True)
main_task = loop.create_task(main(params), name="main")
try:
loop.run_until_complete(main_task)
except KeyboardInterrupt:
pass
finally:
if main_task.cancel():
loop.run_until_complete(main_task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment