-
-
Save it-is-wednesday/2d7cf70fa321b98e32c89be0e9f8446b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
# pylint: disable=line-too-long,invalid-name | |
# ruff: noqa: E501 (line too long) | |
""" | |
Imports all feeds in an OPML file into Evolution! | |
Unforunately, Evolution has no concept of feed folders (as far as I'm aware), | |
so this script ignores categories and dumps all feeds into the root one (the | |
only one). | |
Installation: | |
``` | |
wget -O ~/.local/bin/evolution-opml-import 'https://gist.githubusercontent.com/it-is-wednesday/2d7cf70fa321b98e32c89be0e9f8446b/raw/evolution_opml_import.py' | |
chmod +x ~/.local/bin/evolution-opml-import | |
``` | |
Usage: | |
``` | |
evolution-opml-import feeds.opml | |
``` | |
""" | |
import hashlib | |
import io | |
import os | |
import re | |
import sqlite3 | |
import sys | |
import xml.etree.ElementTree as ET | |
from argparse import ArgumentParser | |
from configparser import ConfigParser | |
from dataclasses import dataclass | |
from operator import itemgetter | |
from pathlib import Path | |
from typing import Iterable, Optional | |
from urllib.error import HTTPError, URLError | |
from urllib.parse import urlparse | |
from urllib.request import Request, urlopen | |
CREATE_TABLE = """ | |
CREATE TABLE IF NOT EXISTS '%s' ( | |
uid TEXT PRIMARY KEY, | |
flags INTEGER, | |
msg_type INTEGER, | |
read INTEGER, | |
deleted INTEGER, | |
replied INTEGER, | |
important INTEGER, | |
junk INTEGER, | |
attachment INTEGER, | |
dirty INTEGER, | |
size INTEGER, | |
dsent NUMERIC, | |
dreceived NUMERIC, | |
subject TEXT, | |
mail_from TEXT, | |
mail_to TEXT, | |
mail_cc TEXT, | |
mlist TEXT, | |
followup_flag TEXT, | |
followup_completed_on TEXT, | |
followup_due_by TEXT, | |
part TEXT, | |
labels TEXT, | |
usertags TEXT, | |
cinfo TEXT, | |
bdata TEXT, | |
userheaders TEXT, | |
preview TEXT, | |
created TEXT, | |
modified TEXT | |
) | |
""" | |
@dataclass | |
class EvolutionFeed: | |
"""A single RSS feed in Evolution""" | |
id: str | |
xml_url: str | |
html_url: str | |
title: str | |
index: int | |
total_count: int | |
icon_filename: Optional[Path] | |
@dataclass | |
class OpmlFeed: | |
"""It's a bit more barebones than the Evolution one but that's all we have""" | |
title: str | |
xml_url: str | |
html_url: str | |
EVOLUTION_HOME = Path("~/.local/share/evolution/mail/rss").expanduser() | |
RSS_INI_PATH = EVOLUTION_HOME / "rss.ini" | |
def main(opml_file_path: Path): | |
with opml_file_path.open(encoding="UTF-8") as opmlfile: | |
feeds = feeds_in_opml(ET.parse(opmlfile)) | |
existing_feeds = find_existing_feeds() | |
for opml_feed in feeds: | |
if opml_feed.xml_url in existing_feeds: | |
continue | |
add_feed(opml_feed) | |
def http_get(url: str) -> bytes: | |
"""Fire an HTTP request with a non-suspicious user-agnet""" | |
print(f"GET: {url}") | |
req = Request(url) | |
req.add_header( | |
"User-Agent", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36", | |
) | |
with urlopen(req) as resp: | |
return resp.read() | |
def add_feed(opml_feed: OpmlFeed) -> None: | |
"""Fetch feed metadata, add to Evolution's DB and `rss.ini` file""" | |
feed_url = opml_feed.xml_url | |
feed_id = hashlib.sha1(feed_url.encode()).hexdigest() | |
try: | |
feed_info = parse_rss_xml(ET.fromstring(http_get(feed_url))) | |
title = t if (t := feed_info["title"]) and t != "" else opml_feed.title | |
total_count = feed_info["total-count"] | |
except HTTPError: | |
feed_info = None | |
title = opml_feed.title | |
total_count = 0 | |
feed = EvolutionFeed( | |
id=feed_id, | |
xml_url=feed_url, | |
html_url=opml_feed.html_url, | |
title=title, | |
total_count=total_count, | |
index=find_largest_index() + 1, | |
icon_filename=download_favicon(feed_url, feed_id), | |
) | |
create_table(feed_id) | |
with open(RSS_INI_PATH, "a", encoding="UTF-8") as rss_ini: | |
rss_ini.write("\n") | |
rss_ini.write(make_conf(feed)) | |
def feeds_in_opml(opml: ET.ElementTree) -> Iterable[OpmlFeed]: | |
if not (body := opml.find("body")): | |
raise ValueError("Weird ass opml") | |
categories = body.findall("outline") | |
for cat in categories: | |
feeds = cat.findall("outline") | |
for feed in feeds: | |
yield OpmlFeed(*itemgetter("title", "xmlUrl", "htmlUrl")(feed.attrib)) | |
def create_table(feed_id): | |
"""Creates an empty table (only columns) for this feed at folders.db""" | |
con = sqlite3.connect(str(EVOLUTION_HOME / "folders.db")) | |
cur = con.cursor() | |
cur.execute(CREATE_TABLE % feed_id) | |
def make_conf(feed: EvolutionFeed) -> str: | |
"""Returns a section representing this feed, to append to rss.ini""" | |
conf = ConfigParser(allow_no_value=True) | |
section = f"feed:{feed.id}" | |
conf.add_section(section) | |
conf.set(section, "href", feed.xml_url) | |
conf.set(section, "display-name", feed.title) | |
conf.set(section, "icon-filename", str(feed.icon_filename or "")) | |
conf.set(section, "content-type", "0") | |
conf.set(section, "total-count", str(feed.total_count)) | |
conf.set(section, "unread-count", "0") | |
conf.set(section, "last-updated", "0") | |
conf.set(section, "index", str(feed.index)) | |
# this trickery is required because `ConfigParser.write` adds a redundant | |
# newline (overall it adds two newlines to the end of the file). | |
# we need just one! | |
with io.StringIO() as strio: | |
conf.write(strio, space_around_delimiters=False) | |
return strio.getvalue()[:-1] | |
def download_favicon(feed_url: str, feed_id: str) -> Optional[Path]: | |
"""Download the site's favicon and return its path on the disk""" | |
parsed_url = urlparse(feed_url) | |
base_site_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
path = EVOLUTION_HOME / f"{feed_id}.png" | |
with open(path, "wb") as icon_file: | |
try: | |
icon_bytes = http_get(f"{base_site_url}/favicon.ico") | |
except URLError: # alright, let's scrape the rel="icon" tag | |
html = http_get(base_site_url).decode() | |
def parse_tag(tag: str): | |
href = match.group(1) if (match := re.search('href="(.*?)"', tag)) else None | |
size = match.group(1) if (match := re.search('sizes="(.*?)"', tag)) else None | |
size_int = int(size.split("x")[0]) if size else 0 | |
return {"size": size_int, "href": href} | |
pat = re.compile('<link\s+rel="(?:icon|Shortcut Icon)".*?>') | |
icons = [parse_tag(tag) for tag in pat.findall(html)] | |
if not icons: | |
return None | |
icon_url: str = max(icons, key=itemgetter("size"))["href"] | |
# it's relative O_O | |
if not icon_url.startswith("https"): | |
icon_url = base_site_url + icon_url | |
icon_bytes = http_get(icon_url) | |
except HTTPError: | |
return None | |
icon_file.write(icon_bytes) | |
return path | |
def parse_rss_xml(xml: ET.Element): | |
"""Extract some data from the feed's XML""" | |
if channel := xml.find("channel"): | |
title = elem.text if (elem := channel.find("title")) else None | |
items = channel.findall("item") | |
else: | |
title = elem.text if (elem := xml.find("title")) else None | |
items = xml.findall("entry") | |
return { | |
"title": title, | |
"total-count": len(items), | |
} | |
def find_largest_index(): | |
"""Returns the latest index found in rss.ini, to be incremented for the added feed""" | |
if not RSS_INI_PATH.exists() or RSS_INI_PATH.read_text() == "": | |
return 0 | |
with open(RSS_INI_PATH, "rb") as rss_ini: | |
# catch OSError in case of a one line file | |
try: | |
rss_ini.seek(-2, os.SEEK_END) | |
while rss_ini.read(1) != b"\n": | |
rss_ini.seek(-2, os.SEEK_CUR) | |
except OSError: | |
rss_ini.seek(0) | |
return int(rss_ini.readline().decode().strip().split("=")[1]) | |
def find_existing_feeds() -> set[str]: | |
"Don't wanna repeat ourselves. Returns a set of XML URLs" | |
if not RSS_INI_PATH.exists(): | |
return set() | |
with RSS_INI_PATH.open() as ini_file: | |
return set( | |
line.removeprefix("href=").rstrip() | |
for line in ini_file.readlines() | |
if line.startswith("href=") | |
) | |
if __name__ == "__main__": | |
parser = ArgumentParser() | |
parser.add_argument("opml_file", type=Path) | |
parser.add_argument("--delete-all-feeds", action="store_true") | |
# this block is deliberately before parse_args, because opml_file should be | |
# mandatory if --delete-all-feeds isn't passed but I'm not sure how to code | |
# that lol | |
if len(sys.argv) == 2 and sys.argv[1] == "--delete-all-feeds": | |
if RSS_INI_PATH.exists(): | |
RSS_INI_PATH.unlink() | |
print(f"Deleted {RSS_INI_PATH}") | |
sys.exit(0) | |
else: | |
print(f"File doesn't exist: {RSS_INI_PATH}") | |
sys.exit(1) | |
args = parser.parse_args() | |
main(args.opml_file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment