Created
December 19, 2022 02:43
-
-
Save jpwarren/9f38132b10f08766bb590d83ef41e69b to your computer and use it in GitHub Desktop.
Python script for managing Mastodon admin blocklists in bulk via the v4.0.2 API.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Export and import blocklists via API | |
import argparse | |
import configparser | |
import csv | |
import requests | |
import os.path | |
import json | |
import csv | |
from collections import OrderedDict | |
import logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s %(levelname)s %(message)s') | |
log = logging.getLogger('blocklist_tool') | |
CONFIGFILE = "/home/mastodon/etc/admin.conf" | |
def fetch_blocklist(token, host): | |
"""Fetch existing block list from server | |
""" | |
api_path = "/api/v1/admin/domain_blocks" | |
url = f"https://{host}{api_path}" | |
domain_blocks = [] | |
link = True | |
while link: | |
response = requests.get(url, headers={'Authorization': f"Bearer {token}"}) | |
if response.status_code != 200: | |
raise ValueError("Unable to fetch domain block list: %s", response) | |
domain_blocks.extend(json.loads(response.content)) | |
# Parse the link header to find the next url to fetch | |
# This is a weird and janky way of doing pagination but | |
# hey nothing we can do about it we just have to deal | |
link = response.headers['Link'] | |
pagination = link.split(', ') | |
if len(pagination) != 2: | |
link = None | |
break | |
else: | |
next = pagination[0] | |
prev = pagination[1] | |
urlstring, rel = next.split('; ') | |
url = urlstring.strip('<').rstrip('>') | |
log.debug(f"Found {len(domain_blocks)} existing domain blocks.") | |
return domain_blocks | |
def export_blocklist(token, host, outfile): | |
"""Export current server blocklist to a csv file""" | |
blocklist = fetch_blocklist(token, host) | |
fieldnames = ['id', 'domain', 'severity', 'reject_media', 'reject_reports', 'private_comment', 'public_comment', 'obfuscate'] | |
blocklist = sorted(blocklist, key=lambda x: int(x['id'])) | |
with open(outfile, "w") as fp: | |
writer = csv.DictWriter(fp, fieldnames, extrasaction='ignore') | |
writer.writeheader() | |
writer.writerows(blocklist) | |
def delete_blocklist(token, host, blockfile): | |
"""Delete domain blocks listed in blockfile""" | |
with open(blockfile) as fp: | |
reader = csv.DictReader(fp) | |
for row in reader: | |
domain = row['domain'] | |
id = row['id'] | |
log.debug(f"Deleting {domain} (id: {id}) from blocklist...") | |
delete_block(token, host, id) | |
def delete_block(token, host, id): | |
"""Remove a domain block""" | |
log.debug(f"Removing domain block {id} at {host}...") | |
api_path = "/api/v1/admin/domain_blocks/" | |
url = f"https://{host}{api_path}{id}" | |
response = requests.delete(url, | |
headers={'Authorization': f"Bearer {token}"} | |
) | |
if response.status_code != 200: | |
if response.status_code == 404: | |
log.warn(f"No such domain block: {id}") | |
return | |
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") | |
def update_known_block(token, host, blockdict): | |
"""Update an existing domain block with information in blockdict""" | |
api_path = "/api/v1/admin/domain_blocks/" | |
id = blockdict['id'] | |
blockdata = blockdict.copy() | |
del blockdata['id'] | |
url = f"https://{host}{api_path}{id}" | |
response = requests.put(url, | |
headers={'Authorization': f"Bearer {token}"}, | |
data=blockdata | |
) | |
if response.status_code != 200: | |
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") | |
def import_blocklist(token, host, blockfile): | |
"""Import a blocklist, merging with the existing one, | |
and updating existing entries if they exist. | |
""" | |
# Fetch the existing blocklist from the server | |
serverblocks = fetch_blocklist(token, host) | |
# Convert knownblocks to a dictionary keyed by domain name | |
knownblocks = {row['domain']: row for row in serverblocks} | |
with open(blockfile) as fp: | |
reader = csv.DictReader(fp) | |
for row in reader: | |
# log.debug(f"Importing definition: {row}") | |
# Try to update an existing entry | |
try: | |
blockdict = knownblocks[row['domain']] | |
log.info(f"Block already exists for {row['domain']}, merging data...") | |
# Merge info from file row, but not the id field (if it exists) | |
if 'id' in row: del row['id'] | |
blockdict.update(row) | |
update_known_block(token, host, blockdict) | |
except KeyError: | |
# domain doesn't have an entry, so we need to create one | |
blockdata = { | |
'domain': row['domain'], | |
# Default to Silence if nothing is specified | |
'severity': row.get('severity', 'silence'), | |
'public_comment': row['public_comment'], | |
'private_comment': row['private_comment'], | |
'reject_media': row.get('reject_media', False), | |
'reject_reports': row.get('reject_reports', False), | |
'obfuscate': row.get('obfuscate', False), | |
} | |
log.info(f"Adding new block for {blockdata['domain']}...") | |
add_block(token, host, blockdata) | |
def add_block(token, host, blockdata): | |
"""Block a domain on Mastodon host | |
""" | |
log.debug(f"Blocking domain {blockdata['domain']} at {host}...") | |
api_path = "/api/v1/admin/domain_blocks" | |
url = f"https://{host}{api_path}" | |
response = requests.post(url, | |
headers={'Authorization': f"Bearer {token}"}, | |
data=blockdata | |
) | |
if response.status_code != 200: | |
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") | |
def augment_args(args): | |
"""Augment commandline arguments with config file parameters""" | |
cp = configparser.ConfigParser() | |
cp.read(os.path.expanduser(args.config)) | |
if not args.token: | |
args.token = cp.get('admin', 'api_token') | |
if not args.host: | |
args.host = cp.get('admin', 'api_host') | |
return args | |
if __name__ == '__main__': | |
ap = argparse.ArgumentParser(description="Bulk blocklist tool", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
ap.add_argument('domainlist', nargs='?', help="Domain block list in CSV format") | |
ap.add_argument('-c', '--config', default='~/etc/admin.conf', help="Config file") | |
ap.add_argument('-H', '--host', help="Mastodon API host to connect to") | |
ap.add_argument('-f', '--blockfile', default='./mastodon-blocklist.csv', help="File to read/write blocklist from/to") | |
ap.add_argument('-i', '--importlist', action="store_true", help="Import a blocklist.") | |
ap.add_argument('-d', '--delete', action="store_true", help="Delete blocks in blocklist.") | |
ap.add_argument('--token', help="Authorization token.") | |
ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.") | |
args = ap.parse_args() | |
args = augment_args(args) | |
if args.loglevel is not None: | |
levelname = args.loglevel.upper() | |
log.setLevel(getattr(logging, levelname)) | |
if args.importlist: | |
log.info(f"Importing blocklist from {args.blockfile}...") | |
import_blocklist(args.token, args.host, args.blockfile) | |
elif args.delete: | |
log.info(f"Deleting blocks in blocklist: {args.blockfile}...") | |
delete_blocklist(args.token, args.host, args.blockfile) | |
else: | |
log.info(f"Exporting blocklist to {args.blockfile}...") | |
export_blocklist(args.token, args.host, args.blockfile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment