Skip to content

Instantly share code, notes, and snippets.

@luelista
Last active November 6, 2023 01:15
Show Gist options
  • Save luelista/e6ca68c4cabada29f2b3f56e3cddeedd to your computer and use it in GitHub Desktop.
Save luelista/e6ca68c4cabada29f2b3f56e3cddeedd to your computer and use it in GitHub Desktop.
Scripts to fetch mastodon notification data and put in local SQLite, for further analysis
# run repeatedly until you have collected enough notifications
curl -H "authorization: Bearer $MASTODON_TOKEN" "https://$MASTODON_HOST/api/v1/notifications?limit=30&max_id=$(sqlite3 mastodon.db 'select min(nid) from noti;' || echo 0)" \
| jq '.[] | {nid:.id,type,created_at,user:.account.username,sid:.status.id,content:.status.content}' \
| dumptodb --db mastodon.db --table noti --key nid
#!/usr/bin/env python3
import sys
import sqlite3
import argparse
from json import JSONDecoder, JSONDecodeError
def json_decoder(data):
decoder = JSONDecoder()
pos = 0
while True:
try:
o, pos = decoder.raw_decode(data, pos)
yield o
pos +=1
except JSONDecodeError:
break
class Tablehelper:
def __init__(self, db, tablename, force_cols=[]):
self.con = sqlite3.connect(db)
self.tablename = tablename
self.con.execute("create table if not exists `"+tablename+"` (rowid integer primary key autoincrement, inserted_at text default CURRENT_TIMESTAMP)")
self.ensure_fields_exist(force_cols)
def ensure_fields_exist(self, keys):
existing = set(item[1] for item in self.con.execute('PRAGMA table_info(`'+self.tablename+'`);'))
to_add = set(keys) - existing
for name in to_add:
self.con.execute('alter table `'+self.tablename+"` add column `"+name+"` text")
def insert(self, items):
keys = set(key for row in items for key in row.keys())
self.ensure_fields_exist(keys)
for item in items:
Tablehelper.post_row(self.con, self.tablename, item)
def post_row(conn, tablename, rec):
keys = '`,`'.join(rec.keys())
question_marks = ','.join(list('?'*len(rec)))
values = tuple(rec.values())
print("inserting ",tablename,rec)
conn.execute('INSERT INTO '+tablename+' (`'+keys+'`) VALUES ('+question_marks+')', values)
conn.commit()
def find(self, **args):
where = " and ".join("`"+key+"` = ?" for key in args.keys())
return list(self.con.execute('select * from `'+self.tablename+"` where "+where,list(args.values())))
parser = argparse.ArgumentParser(description='Dumps JSON data into a sqlite database.')
parser.add_argument('--db', help='database file', required=True)
parser.add_argument('--table', help='table name', required=True)
parser.add_argument('--forcecols', help='force columns', default="")
parser.add_argument('--key', help='key to dedup on', default=None)
args = parser.parse_args()
forcecols = args.forcecols.split(",")
if args.key and not args.key in forcecols: forcecols.append(args.key)
db = Tablehelper(args.db, args.table, forcecols)
def do_insert(item):
if not type(item) is dict:
print("Invalid type",type(item),item)
sys.exit(1)
if args.key:
find_by = {}
find_by[args.key] = item[args.key]
if len(db.find(**find_by)) > 0: return
db.insert([ item ])
indata = sys.stdin.read()
for item in json_decoder(indata):
if type(item) is list:
for listitem in item:
do_insert(listitem)
else:
do_insert(item)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment