Last active
December 3, 2020 22:45
-
-
Save samuelguebo/54652ab5a8e00771b191974cf730c3e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector | |
import json | |
import requests | |
import csv | |
import os | |
from datetime import datetime, date, time | |
""" | |
Wiki replicas credentials, update with your own | |
See how to obtain these here: https://wikitech.wikimedia.org/wiki/Help:Toolforge/#Database#Connecting_to_the_database_replicas_from_your_own_computer | |
""" | |
mydb = mysql.connector.connect( | |
host="127.0.0.1", | |
user="<USERNAME>", | |
password="<PASSWORD>", | |
database="frwiki_p", | |
port="4711" | |
) | |
# Global variables | |
summary = {} | |
user_group = 'sysop' | |
csv_file = f'stats-{user_group}.csv' | |
wikis_with_groups = [] | |
def log(text): | |
""" helper to print colored text in console """ | |
print('\033[93m' + text + '\033[0m') | |
def get_dbnames(): | |
""" Leverage Special:SiteMatrix to get a list of all language wikis """ | |
dbnames = [] | |
api_url = "https://meta.wikimedia.org/w/api.php?action=sitematrix&smlangprop=site&smsiteprop=dbname&smtype=language&format=json&origin=*" | |
total = 0 | |
data = requests.get(api_url).text | |
data = json.loads(data)['sitematrix'] | |
del data['count'] | |
# flatten array | |
data = [x for x in list(data.values())] | |
for x in data: | |
for y in x['site']: | |
dbnames.append(y['dbname']) | |
# add suffix to database names | |
dbnames = [x+"_p" for x in dbnames] | |
return dbnames | |
def get_group_stats(group, database): | |
""" Get a list of users with their respective group """ | |
sqlQuery = f""" | |
SELECT user_name, user_editcount | |
FROM {database}.user | |
LEFT JOIN {database}.user_groups ON ug_user = user_id | |
WHERE ug_group = "{group}" and user_editcount > 50 | |
""" | |
mycursor = mydb.cursor() | |
mycursor.execute(sqlQuery) | |
myresult = mycursor.fetchall() | |
# keep track of wikis having functionaries | |
if len(myresult) > 0: | |
wikis_with_groups.append(database) | |
log('{:<20}| {:<20}| {:<20} |'.format(database, len(myresult), group)) | |
if 'total' not in summary: | |
summary['total'] = len(myresult) | |
else: | |
summary['total'] += len(myresult) | |
summary[database] = len(myresult) | |
# export as CSV | |
with open(csv_file, 'a+', newline='') as file: | |
writer = csv.writer(file) | |
writer.writerow([database, group, summary['total']]) | |
def get_date_diff_in_seconds(date2, date1): | |
""" Generate difference in seconds """ | |
timedelta = date2 - date1 | |
return timedelta.days * 24 * 3600 + timedelta.seconds | |
def seconds_to_datetime(seconds): | |
""" Format date in day-hour-minutes from seconds """ | |
minutes, seconds = divmod(seconds, 60) | |
hours, minutes = divmod(minutes, 60) | |
days, hours = divmod(hours, 24) | |
return (days, hours, minutes, seconds) | |
def run(): | |
""" Entry point of the application. Runs all the necassry operations """ | |
db_names = get_dbnames() | |
# delete previously saved CSV file | |
try: | |
os.remove(csv_file) | |
except: | |
pass | |
# start logging operation duration | |
ops_start_time = datetime.now() | |
# inform users that the operation started | |
log(f'Preparing to collect statistics about {user_group} from {len(db_names)} wikis \n{"-"*65}') | |
log('{:<20}| {:<20}| {:<20} |\n'.format('Project', 'Total', 'Group') + f'{"-"*65}') | |
for db in db_names: | |
get_group_stats('sysop', db) | |
print(f'\nOverall, there are {summary["total"]} {user_group}') | |
print(f'{len(wikis_with_groups)} wikis have {user_group} while {len(db_names)-len(wikis_with_groups)} do not.') | |
print(f'The data was successfully exported as CSV.') | |
# operations end time | |
ops_end_time = datetime.now() | |
ops_delay_raw = seconds_to_datetime(get_date_diff_in_seconds(ops_end_time, ops_start_time)) | |
ops_delay = "%d days, %d hours, %d minutes, %d seconds" % ops_delay_raw | |
log("Operations completed in %s ." % ops_delay) | |
# start the process | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment