Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
mastodon category search
import urllib.request
import sys
import os
import re
import sqlite3 as lite
from bs4 import BeautifulSoup, Comment
# Using https://fediverse.network to get the list of Mastodon Instances
url = 'https://fediverse.network/mastodon'
# A list of instances that I don't care about
dontCare = ['gab.com']
def searchData(con,cur):
# ask them to enter a term to search
answer = input('\nPlease enter a term you would like to search for: ')
print('\nID\t| Category Name')
print('-------------------------------')
cur.execute('SELECT _id,name from category where name LIKE "%'+answer.strip()+'%" order by name')
rows = cur.fetchall()
for row in rows:
print('%s\t| %s' % (row['_id'],row['name']))
answer = input('\nPlease enter either an ID to display the users in that category, s to search again, or m to go back to the menu: ')
if answer == 'm':
menu(con,cur)
elif answer == 's':
searchData(con,cur)
else:
print('\n-------------------------------')
cur.execute('SELECT user_id from members where cat_id = "%s"' % answer)
rows = cur.fetchall()
for row in rows:
cur.execute('SELECT url FROM users WHERE _id = "%s"' % row['user_id'])
row2 = cur.fetchone()
print('%s' % row2['url'])
print('-------------------------------\n')
menu(con,cur)
def getData(con,cur):
# Clear the old data
cur.execute('DELETE FROM category')
cur.execute('DELETE FROM instance')
cur.execute('DELETE FROM users')
cur.execute('DELETE FROM members')
con.commit()
# Get new data
print('Getting the list of Mastodon instances...')
page = urllib.request.urlopen(url)
content = page.read().decode('utf-8')
soup = BeautifulSoup(content, 'html.parser')
table = soup.find('table', {"class": "table-hover"})
rows = table.findAll('tr')
for row in rows:
cols = row.findAll('td')
if len(cols) > 2:
if re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip() not in dontCare:
print('Adding `%s`' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip())
cur.execute('INSERT INTO instance(url) VALUES("%s")' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip())
con.commit()
# For each instance let's check if there is any information on their /explore endpoint
cur.execute('SELECT url from instance')
instances = cur.fetchall()
for instance_result in instances:
try:
page = urllib.request.urlopen('https://'+instance_result['url']+'/explore')
content = page.read().decode('utf-8')
soup = BeautifulSoup(content, 'html.parser')
directories = soup.findAll('div', {"class": "directory__tag"})
for directory in directories:
for a in directory.findAll('a', href=True):
splitArray = a['href'].split('/')
print('Found category `%s` on `%s`' % (splitArray[2],instance_result['url']))
# Do I already have this category?
cat_id = None
cur.execute('SELECT _id FROM category WHERE name = "%s"' % splitArray[2])
row = cur.fetchone()
if row is not None:
# Grab the ID
cat_id = row["_id"]
else:
# If not, then add it and grab the ID
cur.execute('INSERT INTO category(name) VALUES ("%s")' % splitArray[2])
con.commit()
cat_id = cur.lastrowid
# Let's find the users who belong to this category
page2 = urllib.request.urlopen('https://'+instance_result['url']+'/explore/'+splitArray[2])
content2 = page2.read().decode('utf-8')
soup2 = BeautifulSoup(content2, 'html.parser')
for a in soup2.findAll('a', {"class": "account__display-name"}, href=True):
# Do I already have this user?
user_id = None
cur.execute('SELECT _id FROM users WHERE url = "%s"' % a['href'])
row = cur.fetchone()
if row is not None:
# Grab the ID
user_id = row['_id']
else:
# If not, then add it and grab the ID
cur.execute('INSERT INTO users(url) VALUES ("%s")' % a['href'])
con.commit()
user_id = cur.lastrowid
# Add user as member of category
cur.execute('INSERT INTO members(cat_id,user_id) VALUES ("%s","%s")' % (cat_id,user_id))
con.commit()
print('Adding user `%s` to the `%s` category' % (a['href'],splitArray[2]))
except:
print('Error with `%s`' % instance_result['url'])
def menu(con,cur):
# Ask user what they want to do
answer = input('\nWhat would you like to do?\n(1) Refresh all data\n(2) Search current data\n(3) Quit: ')
if answer=='1':
answer = input('Are you sure? [y/N] ')
if answer == 'y':
getData(con,cur)
else:
menu(con,cur)
elif answer=='2' or answer=='s':
searchData(con,cur)
else:
print('Quitting...')
# main program:
def main():
# Set up the database
if not os.path.isfile('MastoData.sqlite'):
firstTime = True
else:
firstTime = False
con = lite.connect('MastoData.sqlite')
with con:
con.row_factory = lite.Row
cur = con.cursor()
if firstTime:
# Create all the tables
print ('Creating the tables...')
cur.execute('DROP TABLE IF EXISTS category')
cur.execute('CREATE TABLE IF NOT EXISTS category(_id INTEGER PRIMARY KEY AUTOINCREMENT, name text)')
cur.execute('DROP TABLE IF EXISTS instance')
cur.execute('CREATE TABLE IF NOT EXISTS instance(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)')
cur.execute('DROP TABLE IF EXISTS users')
cur.execute('CREATE TABLE IF NOT EXISTS users(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)')
cur.execute('DROP TABLE IF EXISTS members')
cur.execute('CREATE TABLE IF NOT EXISTS members(cat_id INTEGER, user_id INTEGER)')
con.commit()
menu(con,cur)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.