Skip to content

Instantly share code, notes, and snippets.

@jpwarren
Created December 19, 2022 02:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jpwarren/9f38132b10f08766bb590d83ef41e69b to your computer and use it in GitHub Desktop.
Save jpwarren/9f38132b10f08766bb590d83ef41e69b to your computer and use it in GitHub Desktop.
Python script for managing Mastodon admin blocklists in bulk via the v4.0.2 API.
#!/usr/bin/python3
# Export and import blocklists via API
import argparse
import configparser
import csv
import requests
import os.path
import json
import csv
from collections import OrderedDict
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger('blocklist_tool')
CONFIGFILE = "/home/mastodon/etc/admin.conf"
def fetch_blocklist(token, host):
"""Fetch existing block list from server
"""
api_path = "/api/v1/admin/domain_blocks"
url = f"https://{host}{api_path}"
domain_blocks = []
link = True
while link:
response = requests.get(url, headers={'Authorization': f"Bearer {token}"})
if response.status_code != 200:
raise ValueError("Unable to fetch domain block list: %s", response)
domain_blocks.extend(json.loads(response.content))
# Parse the link header to find the next url to fetch
# This is a weird and janky way of doing pagination but
# hey nothing we can do about it we just have to deal
link = response.headers['Link']
pagination = link.split(', ')
if len(pagination) != 2:
link = None
break
else:
next = pagination[0]
prev = pagination[1]
urlstring, rel = next.split('; ')
url = urlstring.strip('<').rstrip('>')
log.debug(f"Found {len(domain_blocks)} existing domain blocks.")
return domain_blocks
def export_blocklist(token, host, outfile):
"""Export current server blocklist to a csv file"""
blocklist = fetch_blocklist(token, host)
fieldnames = ['id', 'domain', 'severity', 'reject_media', 'reject_reports', 'private_comment', 'public_comment', 'obfuscate']
blocklist = sorted(blocklist, key=lambda x: int(x['id']))
with open(outfile, "w") as fp:
writer = csv.DictWriter(fp, fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(blocklist)
def delete_blocklist(token, host, blockfile):
"""Delete domain blocks listed in blockfile"""
with open(blockfile) as fp:
reader = csv.DictReader(fp)
for row in reader:
domain = row['domain']
id = row['id']
log.debug(f"Deleting {domain} (id: {id}) from blocklist...")
delete_block(token, host, id)
def delete_block(token, host, id):
"""Remove a domain block"""
log.debug(f"Removing domain block {id} at {host}...")
api_path = "/api/v1/admin/domain_blocks/"
url = f"https://{host}{api_path}{id}"
response = requests.delete(url,
headers={'Authorization': f"Bearer {token}"}
)
if response.status_code != 200:
if response.status_code == 404:
log.warn(f"No such domain block: {id}")
return
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
def update_known_block(token, host, blockdict):
"""Update an existing domain block with information in blockdict"""
api_path = "/api/v1/admin/domain_blocks/"
id = blockdict['id']
blockdata = blockdict.copy()
del blockdata['id']
url = f"https://{host}{api_path}{id}"
response = requests.put(url,
headers={'Authorization': f"Bearer {token}"},
data=blockdata
)
if response.status_code != 200:
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
def import_blocklist(token, host, blockfile):
"""Import a blocklist, merging with the existing one,
and updating existing entries if they exist.
"""
# Fetch the existing blocklist from the server
serverblocks = fetch_blocklist(token, host)
# Convert knownblocks to a dictionary keyed by domain name
knownblocks = {row['domain']: row for row in serverblocks}
with open(blockfile) as fp:
reader = csv.DictReader(fp)
for row in reader:
# log.debug(f"Importing definition: {row}")
# Try to update an existing entry
try:
blockdict = knownblocks[row['domain']]
log.info(f"Block already exists for {row['domain']}, merging data...")
# Merge info from file row, but not the id field (if it exists)
if 'id' in row: del row['id']
blockdict.update(row)
update_known_block(token, host, blockdict)
except KeyError:
# domain doesn't have an entry, so we need to create one
blockdata = {
'domain': row['domain'],
# Default to Silence if nothing is specified
'severity': row.get('severity', 'silence'),
'public_comment': row['public_comment'],
'private_comment': row['private_comment'],
'reject_media': row.get('reject_media', False),
'reject_reports': row.get('reject_reports', False),
'obfuscate': row.get('obfuscate', False),
}
log.info(f"Adding new block for {blockdata['domain']}...")
add_block(token, host, blockdata)
def add_block(token, host, blockdata):
"""Block a domain on Mastodon host
"""
log.debug(f"Blocking domain {blockdata['domain']} at {host}...")
api_path = "/api/v1/admin/domain_blocks"
url = f"https://{host}{api_path}"
response = requests.post(url,
headers={'Authorization': f"Bearer {token}"},
data=blockdata
)
if response.status_code != 200:
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
def augment_args(args):
"""Augment commandline arguments with config file parameters"""
cp = configparser.ConfigParser()
cp.read(os.path.expanduser(args.config))
if not args.token:
args.token = cp.get('admin', 'api_token')
if not args.host:
args.host = cp.get('admin', 'api_host')
return args
if __name__ == '__main__':
ap = argparse.ArgumentParser(description="Bulk blocklist tool",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
ap.add_argument('domainlist', nargs='?', help="Domain block list in CSV format")
ap.add_argument('-c', '--config', default='~/etc/admin.conf', help="Config file")
ap.add_argument('-H', '--host', help="Mastodon API host to connect to")
ap.add_argument('-f', '--blockfile', default='./mastodon-blocklist.csv', help="File to read/write blocklist from/to")
ap.add_argument('-i', '--importlist', action="store_true", help="Import a blocklist.")
ap.add_argument('-d', '--delete', action="store_true", help="Delete blocks in blocklist.")
ap.add_argument('--token', help="Authorization token.")
ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.")
args = ap.parse_args()
args = augment_args(args)
if args.loglevel is not None:
levelname = args.loglevel.upper()
log.setLevel(getattr(logging, levelname))
if args.importlist:
log.info(f"Importing blocklist from {args.blockfile}...")
import_blocklist(args.token, args.host, args.blockfile)
elif args.delete:
log.info(f"Deleting blocks in blocklist: {args.blockfile}...")
delete_blocklist(args.token, args.host, args.blockfile)
else:
log.info(f"Exporting blocklist to {args.blockfile}...")
export_blocklist(args.token, args.host, args.blockfile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment