Last active
July 12, 2021 19:05
-
-
Save magical/2da1bd0c9b83a6127212c43a5c313e87 to your computer and use it in GitHub Desktop.
Scrapes urls from your mastodon account and adds them to shiori
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright (C) 2021 Andrew Ekstedt (magical) | |
# | |
# Borrows a small amount of code from mastodon-backup, | |
# Copyright (C) 2017-2018 Alex Schroeder <alex@gnu.org> | |
# This program is free software: you can redistribute it and/or modify it under | |
# the terms of the GNU General Public License as published by the Free Software | |
# Foundation, either version 3 of the License, or (at your option) any later | |
# version. | |
# | |
# This program is distributed in the hope that it will be useful, but WITHOUT | |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License along with | |
# this program. If not, see <http://www.gnu.org/licenses/>. | |
import sys | |
import os.path | |
import logging | |
import argparse | |
import subprocess | |
# dependencies: | |
# https://pypi.org/project/Mastodon.py/ | |
# https://pypi.org/project/beautifulsoup4/ | |
# https://github.com/go-shiori/shiori | |
from mastodon import Mastodon, MastodonError | |
from bs4 import BeautifulSoup, SoupStrainer | |
SHIORI = "~/go/bin/shiori" | |
log = logging.getLogger('url_scraper') | |
def login(user, scopes = ['read']): | |
""" | |
Login to your Mastodon account | |
""" | |
if '@' not in user: | |
raise ValueError(user) | |
(username, domain) = user.split("@", 1) | |
url = 'https://' + domain | |
client_secret = domain + '.client.secret' | |
user_secret = domain + '.user.' + username + '.secret' | |
mastodon = None | |
if not os.path.isfile(client_secret): | |
print("Registering app") | |
Mastodon.create_app( | |
'magical_url_scraper', | |
api_base_url = url, | |
to_file = client_secret) | |
if not os.path.isfile(user_secret): | |
print("Log in") | |
mastodon = Mastodon( | |
client_id = client_secret, | |
api_base_url = url) | |
url = mastodon.auth_request_url( | |
client_id = client_secret, | |
scopes=scopes) | |
print("Visit the following URL and authorize the app:") | |
print(url) | |
print("Then paste the access token here:") | |
token = sys.stdin.readline().rstrip() | |
# on the very first login, pace has no effect | |
mastodon.log_in( | |
username = username, | |
code = token, | |
to_file = user_secret, | |
scopes=scopes) | |
else: | |
# in case the user kept running into a General API problem | |
mastodon = Mastodon( | |
client_id = client_secret, | |
access_token = user_secret, | |
api_base_url = url, | |
ratelimit_method='pace', | |
ratelimit_pacefactor=0.9, | |
request_timeout=300) | |
return mastodon | |
def scrape(user, dry_run=False): | |
try: | |
mastodon = login(user) | |
except MastodonError as e: | |
log.warning("error logging in as %s: %s", user, e) | |
return | |
# pick up where we left off | |
last_id = None | |
last_id_file = get_last_id_filename(user) | |
try: | |
with open(last_id_file, 'r') as f: | |
last_id = int(f.read()) | |
except (OSError, IOError, ValueError): | |
pass | |
try: | |
me = mastodon.me() | |
p = mastodon.account_statuses(me, since_id=last_id, limit=100) | |
except MastodonError as e: | |
log.error("error getting statuses, user = %s", user, exc_info=e) | |
return | |
try: | |
done = set() | |
while p: | |
for toot in reversed(p): | |
if toot.card: | |
log.info("url: %s", toot.card.url) | |
saved = save_url(toot.card.url, dry_run=dry_run) | |
if saved: | |
done.add(toot.card.url) | |
for url in extract_urls_from_string(toot.content): | |
if url in done: | |
continue | |
log.info("url: %s", url) | |
saved = save_url(url, dry_run=dry_run) | |
if saved: | |
done.add(url) | |
last_id = toot.id | |
try: | |
p = mastodon.fetch_previous(p) | |
except MastodonError as e: | |
log.error("error getting statuses, user = %s", user, exc_info=e) | |
break | |
finally: | |
if last_id and not dry_run: | |
with open(last_id_file, 'w') as f: | |
print(last_id, file=f) | |
def extract_urls_from_string(content): | |
"""scans an html string for <a href> | |
>>> list(extract_urls_from_string('''<div class="status__content__text status__content__text--visible translate"><p>thanks, I hate it</p><p><a href="https://www.theverge.com/2021/6/1/22463321/firefox-design-redesign-bigger-floatier-tabs" rel="noopener noreferrer" target="_blank" class="status-link unhandled-link" title="https://www.theverge.com/2021/6/1/22463321/firefox-design-redesign-bigger-floatier-tabs"><span class="invisible">https://www.</span><span class="ellipsis">theverge.com/2021/6/1/22463321</span><span class="invisible">/firefox-design-redesign-bigger-floatier-tabs</span></a></p></div>''')) | |
['https://www.theverge.com/2021/6/1/22463321/firefox-design-redesign-bigger-floatier-tabs'] | |
""" | |
if not content: | |
return | |
if not isinstance(content, str): | |
raise TypeError(content) | |
filter = SoupStrainer('a', href=True) | |
soup = BeautifulSoup(content, 'html.parser', parse_only=filter) | |
for tag in soup.find_all('a'): | |
href = tag.get('href') | |
if href and href.startswith(("http://", "https://")): | |
cls = tag.get('class') | |
if cls and 'mention' in cls: | |
continue | |
if cls and 'hashtag' in cls: | |
# TODO: extract tags | |
continue | |
#log.info('found link: %s', tag) | |
yield href | |
def get_last_id_filename(user): | |
if '@' not in user: | |
raise ValueError(user) | |
(username, domain) = user.split("@", 1) | |
filename = domain + '.user.' + username + '.lastid' | |
return filename | |
def save_url(url, dry_run=False): | |
"""add a url to shiori. | |
returns True if successful (or the url has already been added) | |
returns False on error and logs a message. | |
""" | |
if dry_run: | |
return True | |
try: | |
output = subprocess.check_output([os.path.expanduser(SHIORI), 'add', '-t', 'from-mastodon', url], stderr=subprocess.STDOUT) | |
except subprocess.CalledProcessError as e: | |
log.error("shiori failed, url = %s", url, exc_info=e) | |
return False | |
# if shiori already has a bookmark, | |
# it outputs "UNIQUE constraint failed: bookmark.url" | |
# on success, it outputs: | |
# 97. tilde.town is a computer meant for sharing. (21-31 minutes) | |
# > https://tilde.town | |
# + Wed 02 Dec 2020 07:52:10 PM UTC | |
# # from-mastodon | |
# | |
if not output: | |
log.error("shiori failed with no output, url = %s", url) | |
return False | |
if b"UNIQUE constraint failed: bookmark.url" in output: | |
log.info("url already added, url = %s", url) | |
return True | |
if output.count(b'\n') != 5: | |
log.error("shiori output doesn't look right, url = %s, output = %s", url, output) | |
return False | |
return True | |
def main(): | |
# parse arguments | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--log', help='path to log file (default: stderr)') | |
parser.add_argument('-n', '--dry-run', action='store_true', help="don't actually save the urls") | |
parser.add_argument('user', help="user@domain for the mastodon account to scan") | |
args = parser.parse_args() | |
# sanity check | |
if not os.path.exists(os.path.expanduser(SHIORI)): | |
log.error("command not found: %s", SHIORI) | |
# get data dir | |
data_var = 'XDG_DATA_HOME' | |
if data_var in os.environ and os.environ[data_var] and os.path.isabs(os.environ[data_var]): | |
data_dir = os.environ[data_var] | |
else: | |
data_dir = os.path.expanduser("~/.config/share") | |
# create data dir if necessary | |
os.makedirs(data_dir, exist_ok=True) | |
data_dir = os.path.join(data_dir, "url_scraper") | |
os.makedirs(data_dir, mode=0o700, exist_ok=True) | |
# set up logging | |
# TODO: log errors to a different file? | |
log_file=None | |
if args.log: | |
if os.path.isabs(args.log): | |
log_file = args.log | |
else: | |
log_file = os.path.join(data_dir, args.log) | |
logging.basicConfig(filename=log_file, level=logging.INFO) | |
log.setLevel(logging.INFO) | |
# cd to data dir | |
os.chdir(data_dir) | |
# scrape the thing | |
try: | |
scrape(args.user, dry_run=args.dry_run) | |
except Exception as e: | |
log.error("unhandled exception", exc_info=e) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment