Skip to content

Instantly share code, notes, and snippets.

@umrysh
Created July 26, 2019 15:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save umrysh/483f37e4bb8d0f573e10879df5da84b3 to your computer and use it in GitHub Desktop.
Save umrysh/483f37e4bb8d0f573e10879df5da84b3 to your computer and use it in GitHub Desktop.
mastodon category search
import urllib.request
import sys
import os
import re
import sqlite3 as lite
from bs4 import BeautifulSoup, Comment
# Using https://fediverse.network to get the list of Mastodon Instances
url = 'https://fediverse.network/mastodon'
# A list of instances that I don't care about
dontCare = ['gab.com']
def searchData(con,cur):
# ask them to enter a term to search
answer = input('\nPlease enter a term you would like to search for: ')
print('\nID\t| Category Name')
print('-------------------------------')
cur.execute('SELECT _id,name from category where name LIKE "%'+answer.strip()+'%" order by name')
rows = cur.fetchall()
for row in rows:
print('%s\t| %s' % (row['_id'],row['name']))
answer = input('\nPlease enter either an ID to display the users in that category, s to search again, or m to go back to the menu: ')
if answer == 'm':
menu(con,cur)
elif answer == 's':
searchData(con,cur)
else:
print('\n-------------------------------')
cur.execute('SELECT user_id from members where cat_id = "%s"' % answer)
rows = cur.fetchall()
for row in rows:
cur.execute('SELECT url FROM users WHERE _id = "%s"' % row['user_id'])
row2 = cur.fetchone()
print('%s' % row2['url'])
print('-------------------------------\n')
menu(con,cur)
def getData(con,cur):
# Clear the old data
cur.execute('DELETE FROM category')
cur.execute('DELETE FROM instance')
cur.execute('DELETE FROM users')
cur.execute('DELETE FROM members')
con.commit()
# Get new data
print('Getting the list of Mastodon instances...')
page = urllib.request.urlopen(url)
content = page.read().decode('utf-8')
soup = BeautifulSoup(content, 'html.parser')
table = soup.find('table', {"class": "table-hover"})
rows = table.findAll('tr')
for row in rows:
cols = row.findAll('td')
if len(cols) > 2:
if re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip() not in dontCare:
print('Adding `%s`' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip())
cur.execute('INSERT INTO instance(url) VALUES("%s")' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip())
con.commit()
# For each instance let's check if there is any information on their /explore endpoint
cur.execute('SELECT url from instance')
instances = cur.fetchall()
for instance_result in instances:
try:
page = urllib.request.urlopen('https://'+instance_result['url']+'/explore')
content = page.read().decode('utf-8')
soup = BeautifulSoup(content, 'html.parser')
directories = soup.findAll('div', {"class": "directory__tag"})
for directory in directories:
for a in directory.findAll('a', href=True):
splitArray = a['href'].split('/')
print('Found category `%s` on `%s`' % (splitArray[2],instance_result['url']))
# Do I already have this category?
cat_id = None
cur.execute('SELECT _id FROM category WHERE name = "%s"' % splitArray[2])
row = cur.fetchone()
if row is not None:
# Grab the ID
cat_id = row["_id"]
else:
# If not, then add it and grab the ID
cur.execute('INSERT INTO category(name) VALUES ("%s")' % splitArray[2])
con.commit()
cat_id = cur.lastrowid
# Let's find the users who belong to this category
page2 = urllib.request.urlopen('https://'+instance_result['url']+'/explore/'+splitArray[2])
content2 = page2.read().decode('utf-8')
soup2 = BeautifulSoup(content2, 'html.parser')
for a in soup2.findAll('a', {"class": "account__display-name"}, href=True):
# Do I already have this user?
user_id = None
cur.execute('SELECT _id FROM users WHERE url = "%s"' % a['href'])
row = cur.fetchone()
if row is not None:
# Grab the ID
user_id = row['_id']
else:
# If not, then add it and grab the ID
cur.execute('INSERT INTO users(url) VALUES ("%s")' % a['href'])
con.commit()
user_id = cur.lastrowid
# Add user as member of category
cur.execute('INSERT INTO members(cat_id,user_id) VALUES ("%s","%s")' % (cat_id,user_id))
con.commit()
print('Adding user `%s` to the `%s` category' % (a['href'],splitArray[2]))
except:
print('Error with `%s`' % instance_result['url'])
def menu(con,cur):
# Ask user what they want to do
answer = input('\nWhat would you like to do?\n(1) Refresh all data\n(2) Search current data\n(3) Quit: ')
if answer=='1':
answer = input('Are you sure? [y/N] ')
if answer == 'y':
getData(con,cur)
else:
menu(con,cur)
elif answer=='2' or answer=='s':
searchData(con,cur)
else:
print('Quitting...')
# main program:
def main():
# Set up the database
if not os.path.isfile('MastoData.sqlite'):
firstTime = True
else:
firstTime = False
con = lite.connect('MastoData.sqlite')
with con:
con.row_factory = lite.Row
cur = con.cursor()
if firstTime:
# Create all the tables
print ('Creating the tables...')
cur.execute('DROP TABLE IF EXISTS category')
cur.execute('CREATE TABLE IF NOT EXISTS category(_id INTEGER PRIMARY KEY AUTOINCREMENT, name text)')
cur.execute('DROP TABLE IF EXISTS instance')
cur.execute('CREATE TABLE IF NOT EXISTS instance(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)')
cur.execute('DROP TABLE IF EXISTS users')
cur.execute('CREATE TABLE IF NOT EXISTS users(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)')
cur.execute('DROP TABLE IF EXISTS members')
cur.execute('CREATE TABLE IF NOT EXISTS members(cat_id INTEGER, user_id INTEGER)')
con.commit()
menu(con,cur)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment