Created
July 26, 2019 15:39
-
-
Save umrysh/483f37e4bb8d0f573e10879df5da84b3 to your computer and use it in GitHub Desktop.
mastodon category search
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import urllib.request | |
import sys | |
import os | |
import re | |
import sqlite3 as lite | |
from bs4 import BeautifulSoup, Comment | |
# Using https://fediverse.network to get the list of Mastodon Instances | |
url = 'https://fediverse.network/mastodon' | |
# A list of instances that I don't care about | |
dontCare = ['gab.com'] | |
def searchData(con,cur): | |
# ask them to enter a term to search | |
answer = input('\nPlease enter a term you would like to search for: ') | |
print('\nID\t| Category Name') | |
print('-------------------------------') | |
cur.execute('SELECT _id,name from category where name LIKE "%'+answer.strip()+'%" order by name') | |
rows = cur.fetchall() | |
for row in rows: | |
print('%s\t| %s' % (row['_id'],row['name'])) | |
answer = input('\nPlease enter either an ID to display the users in that category, s to search again, or m to go back to the menu: ') | |
if answer == 'm': | |
menu(con,cur) | |
elif answer == 's': | |
searchData(con,cur) | |
else: | |
print('\n-------------------------------') | |
cur.execute('SELECT user_id from members where cat_id = "%s"' % answer) | |
rows = cur.fetchall() | |
for row in rows: | |
cur.execute('SELECT url FROM users WHERE _id = "%s"' % row['user_id']) | |
row2 = cur.fetchone() | |
print('%s' % row2['url']) | |
print('-------------------------------\n') | |
menu(con,cur) | |
def getData(con,cur): | |
# Clear the old data | |
cur.execute('DELETE FROM category') | |
cur.execute('DELETE FROM instance') | |
cur.execute('DELETE FROM users') | |
cur.execute('DELETE FROM members') | |
con.commit() | |
# Get new data | |
print('Getting the list of Mastodon instances...') | |
page = urllib.request.urlopen(url) | |
content = page.read().decode('utf-8') | |
soup = BeautifulSoup(content, 'html.parser') | |
table = soup.find('table', {"class": "table-hover"}) | |
rows = table.findAll('tr') | |
for row in rows: | |
cols = row.findAll('td') | |
if len(cols) > 2: | |
if re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip() not in dontCare: | |
print('Adding `%s`' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip()) | |
cur.execute('INSERT INTO instance(url) VALUES("%s")' % re.sub(r'[^a-zA-Z0-9.]', '', cols[1].get_text()).strip()) | |
con.commit() | |
# For each instance let's check if there is any information on their /explore endpoint | |
cur.execute('SELECT url from instance') | |
instances = cur.fetchall() | |
for instance_result in instances: | |
try: | |
page = urllib.request.urlopen('https://'+instance_result['url']+'/explore') | |
content = page.read().decode('utf-8') | |
soup = BeautifulSoup(content, 'html.parser') | |
directories = soup.findAll('div', {"class": "directory__tag"}) | |
for directory in directories: | |
for a in directory.findAll('a', href=True): | |
splitArray = a['href'].split('/') | |
print('Found category `%s` on `%s`' % (splitArray[2],instance_result['url'])) | |
# Do I already have this category? | |
cat_id = None | |
cur.execute('SELECT _id FROM category WHERE name = "%s"' % splitArray[2]) | |
row = cur.fetchone() | |
if row is not None: | |
# Grab the ID | |
cat_id = row["_id"] | |
else: | |
# If not, then add it and grab the ID | |
cur.execute('INSERT INTO category(name) VALUES ("%s")' % splitArray[2]) | |
con.commit() | |
cat_id = cur.lastrowid | |
# Let's find the users who belong to this category | |
page2 = urllib.request.urlopen('https://'+instance_result['url']+'/explore/'+splitArray[2]) | |
content2 = page2.read().decode('utf-8') | |
soup2 = BeautifulSoup(content2, 'html.parser') | |
for a in soup2.findAll('a', {"class": "account__display-name"}, href=True): | |
# Do I already have this user? | |
user_id = None | |
cur.execute('SELECT _id FROM users WHERE url = "%s"' % a['href']) | |
row = cur.fetchone() | |
if row is not None: | |
# Grab the ID | |
user_id = row['_id'] | |
else: | |
# If not, then add it and grab the ID | |
cur.execute('INSERT INTO users(url) VALUES ("%s")' % a['href']) | |
con.commit() | |
user_id = cur.lastrowid | |
# Add user as member of category | |
cur.execute('INSERT INTO members(cat_id,user_id) VALUES ("%s","%s")' % (cat_id,user_id)) | |
con.commit() | |
print('Adding user `%s` to the `%s` category' % (a['href'],splitArray[2])) | |
except: | |
print('Error with `%s`' % instance_result['url']) | |
def menu(con,cur): | |
# Ask user what they want to do | |
answer = input('\nWhat would you like to do?\n(1) Refresh all data\n(2) Search current data\n(3) Quit: ') | |
if answer=='1': | |
answer = input('Are you sure? [y/N] ') | |
if answer == 'y': | |
getData(con,cur) | |
else: | |
menu(con,cur) | |
elif answer=='2' or answer=='s': | |
searchData(con,cur) | |
else: | |
print('Quitting...') | |
# main program: | |
def main(): | |
# Set up the database | |
if not os.path.isfile('MastoData.sqlite'): | |
firstTime = True | |
else: | |
firstTime = False | |
con = lite.connect('MastoData.sqlite') | |
with con: | |
con.row_factory = lite.Row | |
cur = con.cursor() | |
if firstTime: | |
# Create all the tables | |
print ('Creating the tables...') | |
cur.execute('DROP TABLE IF EXISTS category') | |
cur.execute('CREATE TABLE IF NOT EXISTS category(_id INTEGER PRIMARY KEY AUTOINCREMENT, name text)') | |
cur.execute('DROP TABLE IF EXISTS instance') | |
cur.execute('CREATE TABLE IF NOT EXISTS instance(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)') | |
cur.execute('DROP TABLE IF EXISTS users') | |
cur.execute('CREATE TABLE IF NOT EXISTS users(_id INTEGER PRIMARY KEY AUTOINCREMENT, url text)') | |
cur.execute('DROP TABLE IF EXISTS members') | |
cur.execute('CREATE TABLE IF NOT EXISTS members(cat_id INTEGER, user_id INTEGER)') | |
con.commit() | |
menu(con,cur) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment