Skip to content

Instantly share code, notes, and snippets.

@simonw
Created November 5, 2022 21:03
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simonw/bac2bfb7973832d97d7b023898eae885 to your computer and use it in GitHub Desktop.
Save simonw/bac2bfb7973832d97d7b023898eae885 to your computer and use it in GitHub Desktop.
Stream activity from Mastodon into a SQLite database

Stream Mastodon activity into a SQLite database

This script subscribes to the live HTTP feed of public activity on my Mastodon instance and writes the results into SQLite database tables.

It needs sqlite-utils and httpx:

pip install sqlite-utils httpx

Then run:

python stream.py stream.db

If you want to point it at a different Mastodon instance you'll need to edit the script!

Here's the schema after I ran it for a while:

% sqlite-utils schema stream.db
CREATE TABLE [update] (
   [id] TEXT PRIMARY KEY,
   [created_at] TEXT,
   [in_reply_to_id] TEXT,
   [in_reply_to_account_id] TEXT,
   [sensitive] INTEGER,
   [spoiler_text] TEXT,
   [visibility] TEXT,
   [language] TEXT,
   [uri] TEXT,
   [url] TEXT,
   [replies_count] INTEGER,
   [reblogs_count] INTEGER,
   [favourites_count] INTEGER,
   [edited_at] TEXT,
   [content] TEXT,
   [reblog] TEXT,
   [account] TEXT,
   [media_attachments] TEXT,
   [mentions] TEXT,
   [tags] TEXT,
   [emojis] TEXT,
   [card] TEXT,
   [poll] TEXT
);
import httpx
import asyncio
import json
import sqlite_utils
import sys
async def run(db):
client = httpx.AsyncClient()
async with client.stream(
"GET", "https://fedi.simonwillison.net/api/v1/streaming/public", timeout=None
) as response:
event_type = None
async for line in response.aiter_lines():
line = line.strip()
if not line:
continue
if line.startswith("event:"):
event_type = line.split(":", 1)[1].strip()
continue
if line.startswith("data:"):
data = line.split(":", 1)[1]
decoded = json.loads(data)
if not isinstance(decoded, dict):
print("event_type", event_type, "data", data, " (not a dict)")
continue
print(event_type)
print(json.dumps(decoded, indent=2))
db[event_type].insert(decoded, pk="id", alter=True)
continue
if __name__ == "__main__":
db = sqlite_utils.Database(sys.argv[1])
db.enable_wal()
asyncio.run(run(db))
@hiway
Copy link

hiway commented Nov 15, 2022

Unsure why conflicting ids are being inserted, could be edits.

Workaround to keep the script going:

import sqlite3

...

try:
    db[event_type].insert(decoded, pk="id", alter=True)
    continue
except sqlite3.IntegrityError as exc:
    continue

@simonw
Copy link
Author

simonw commented Nov 16, 2022

You can fix that by adding replace=True to the .insert() call - that will cause it to replace existing records with the same ID.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment