|
print('Importing necessary libraries...') |
|
from bs4 import BeautifulSoup |
|
from threading import Thread |
|
from time import sleep |
|
import datetime |
|
import msvcrt |
|
import webbrowser |
|
import json |
|
import urllib.request |
|
import dateutil.parser |
|
from pytz import timezone |
|
import calendar |
|
import pytz |
|
import time |
|
import winsound |
|
import ctypes |
|
import argparse |
|
import codecs |
|
import colorama |
|
from colorama import Fore, Style |
|
import re |
|
import os |
|
from http.client import HTTPResponse |
|
import math |
|
|
|
# Change this to the domain of a MediaWiki site at your choice. |
|
site = 'minecraft.gamepedia.com' |
|
ctypes.windll.kernel32.SetConsoleTitleW('MediaWiki watchdog – {}'.format(site)) |
|
|
|
print('Initializing...') |
|
|
|
colorama.init() |
|
parser = argparse.ArgumentParser(description='Watches a MediaWiki site.') |
|
parser.add_argument('-l','--log', default='', help='Writes the changes into a log file in additional to printing to the console.') |
|
parser.add_argument('-a','--alternativeTitle', action='store_true', help='Makes the title bar static and displays the status at the lowermost console line instead.') |
|
parsed = parser.parse_args() |
|
logfile = parsed.log |
|
statusLength = 0 |
|
currentStatus = '' |
|
|
|
if parsed.alternativeTitle: |
|
def updateStatus(inactiveTime): |
|
global statusLength, currentStatus |
|
currentStatus = '> Active' if inactiveTime == 0 else '> Inactive for {}'.format(formatInactiveTime(inactiveTime)) |
|
print(currentStatus, end='') |
|
statusLength = len(currentStatus) |
|
def clearStatusLine(): |
|
print('\b'*statusLength, end='') |
|
else: |
|
def updateStatus(inactiveTime): |
|
ctypes.windll.kernel32.SetConsoleTitleW(('MediaWiki watchdog – {}'+('' if inactiveTime == 0 else ' ({} inactive)')).format(site,formatInactiveTime(inactiveTime))) |
|
def clearStatusLine(): |
|
pass |
|
|
|
f = '' |
|
if logfile != '': |
|
parsed = datetime.datetime.now() |
|
dic = { |
|
'date': str(parsed.day), |
|
'month': str(parsed.month), |
|
'year': str(parsed.year), |
|
'hour': str(parsed.hour) |
|
} |
|
if parsed.minute < 10: |
|
dic['minute'] = '0' + str(parsed.minute) |
|
else: |
|
dic['minute'] = str(parsed.minute) |
|
|
|
if parsed.second < 10: |
|
dic['second'] = '0' + str(parsed.second) |
|
else: |
|
dic['second'] = str(parsed.second) |
|
|
|
now = '{hour}h{minute}:{second} | {date}/{month}/{year}'.format(**dic) |
|
f = codecs.open(logfile, 'w', 'utf-8') |
|
f.write("""\ |
|
Wikipedia watchdog - Site: {} |
|
Start time: {} |
|
------------------------------------------------------------------------""".format(site, now)) |
|
f.flush() |
|
|
|
latesttime = json.loads(urllib.request.urlopen('http://{}/api.php?action=query&list=recentchanges&rclimit=1&rcprop=timestamp&format=json'.format(site)).read())['query']['recentchanges'][0]['timestamp'] |
|
latestdiff = '' |
|
latestpage = '' |
|
latestuser = '' |
|
isNotPlaying = True |
|
notificationsOn = True |
|
lastActive = datetime.datetime.now() |
|
|
|
def formatDateTime(dateTimeIn): |
|
parsed = dateutil.parser.parse(dateTimeIn) |
|
parsed += datetime.timedelta(hours=7) # Change the hours to your offset from UTC. |
|
dic = { |
|
'date': str(parsed.day), |
|
'month': str(parsed.month), |
|
'year': str(parsed.year), |
|
'hour': str(parsed.hour) |
|
} |
|
if parsed.minute < 10: |
|
dic['minute'] = '0' + str(parsed.minute) |
|
else: |
|
dic['minute'] = str(parsed.minute) |
|
|
|
if parsed.second < 10: |
|
dic['second'] = '0' + str(parsed.second) |
|
else: |
|
dic['second'] = str(parsed.second) |
|
|
|
return '{hour}h{minute}:{second} | {date}/{month}/{year}'.format(**dic) |
|
|
|
def formatExpiry(expiryIn): |
|
if expiryIn == 'infinite': |
|
return 'Never expires.' |
|
else: |
|
return 'expires at {}'.format(formatDateTime(expiryIn)) |
|
|
|
def formatInactiveTime(totalMinutes): |
|
days = totalMinutes // 1440 |
|
hours = totalMinutes // 60 % 24 |
|
minutes = totalMinutes % 60 |
|
|
|
daysString = (str(days) + 'd') if days > 0 else '' |
|
hoursString = (str(hours) + 'h') if hours > 0 else '' |
|
minutesString = (('0' if minutes < 10 else '') + str(minutes)) if hours > 0 else (str(minutes) + "'") |
|
|
|
return '{}{}{}'.format(daysString, hoursString, minutesString) |
|
|
|
def datetime2utc_time(datetime): |
|
utc_datetime = datetime.astimezone(timezone('utc')).replace(tzinfo=None) |
|
utc_timetuple = utc_datetime.timetuple() |
|
utc_time = calendar.timegm(utc_timetuple) + datetime.microsecond / 1E6 |
|
return utc_time |
|
|
|
def processLogEvent(timestamp): |
|
global site |
|
response = urllib.request.urlopen("https://{site}/api.php?action=query&list=logevents&lestart={ts}&leend={ts}&format=json&lelimit=500".format(site=site, ts=timestamp)) |
|
try: |
|
logEvent = json.loads(response)['logevents'][0] |
|
except: |
|
return '[WARN]: Received a HTTPResponse: ' + str(response) |
|
type = logEvent['type'] |
|
comment = logEvent['comment'] |
|
if comment == '': |
|
comment = '(No description provided.)' |
|
if type == 'protect': |
|
action = logEvent['action'] |
|
if action == 'protect': |
|
res = '{} protected {}'.format(logEvent['user'], logEvent['title']) |
|
for entry in logEvent['parameters']['details']: |
|
res += '. {}: {}; {}'.format(entry['type'].capitalize(), entry['level'], formatExpiry(entry['expiry'])) |
|
return res + ' | ' + comment |
|
elif action == 'unprotect': |
|
return '{} unprotected {} | {}'.format(logEvent['user'], logEvent['title'], comment) |
|
elif action == 'modify': |
|
res = '{} modified protection of {}'.format(logEvent['user'], logEvent['title']) |
|
for entry in logEvent['parameters']['details']: |
|
res += '. {}: {}; {}'.format(entry['type'].capitalize(), entry['level'], formatExpiry(entry['expiry'])) |
|
return res + ' | ' + comment |
|
else: |
|
return '(A protection action. Type not supported.)' |
|
else: |
|
return '(An action. Type not supported.)' |
|
|
|
def processChange(entry): |
|
if 'flags' in entry: |
|
if 'bot' in entry['flags']: |
|
return '' |
|
comment = re.sub('\u200e', ' ', BeautifulSoup(re.sub('</span> ', '<span>', entry['parsedcomment']), 'html.parser').text) |
|
if comment == '': |
|
comment = '(No description provided.)' |
|
entry['parsedcomment'] = comment |
|
type = entry['type'] |
|
if type == 'new': |
|
return ('{user} has created {title} (' + Fore.GREEN + Style.BRIGHT + '+{newlen}' + Fore.RESET + Style.NORMAL + ') | {parsedcomment}').format(**entry) |
|
elif type == 'categorize': |
|
return '[' + entry['title'] + "] " + comment |
|
elif type == 'edit': |
|
try: |
|
entry['mi'] = ', minor' if 'minor' in entry else '' |
|
sizechange = entry['newlen'] - entry['oldlen'] |
|
if sizechange > 0: |
|
sizechange = Fore.GREEN + Style.BRIGHT + '+' + str(sizechange) + Fore.RESET + Style.NORMAL |
|
elif sizechange == 0: |
|
sizechange = Fore.YELLOW + Style.BRIGHT + '0' + Fore.RESET + Style.NORMAL |
|
else: |
|
sizechange = Fore.RED + Style.BRIGHT + str(sizechange) + Fore.RESET + Style.NORMAL |
|
entry['sizechange'] = sizechange |
|
if 'mw-undo' in entry['tags']: |
|
entry['tags'].remove('mw-undo') |
|
return '{user} has undone {title} ({sizechange}{mi}) | {parsedcomment}'.format(**entry) |
|
elif 'mw-rollback' in entry['tags']: |
|
entry['tags'].remove('mw-rollback') |
|
return '{user} has rolled back {title} ({sizechange}{mi}) | {parsedcomment}'.format(**entry) |
|
else: |
|
return '{user} has edited {title} ({sizechange}{mi}) | {parsedcomment}'.format(**entry) |
|
except: |
|
entry['mi'] = ', minor' if 'minor' in entry else '' |
|
return '{user} has edited {title}{mi} | {parsedcomment}'.format(**entry) |
|
elif type == 'log': |
|
logtype = entry['logtype'] |
|
if logtype == 'block': |
|
if 'flags' in entry['logparams']: |
|
fl = '' |
|
second = False |
|
for stri in entry['logparams']['flags']: |
|
if second: |
|
fl += ', ' |
|
else: |
|
second = True |
|
fl += stri |
|
entry['fls'] = ' (flags: ' + fl + ')' |
|
else: |
|
entry['fls'] = '' |
|
entry['title'] = entry['title'][5:] |
|
entry['duration'] = entry['logparams']['duration'] |
|
if 'expiry' in entry['logparams']: |
|
entry['expiry'] = ' (expires ' + formatDateTime(entry['logparams']['expiry']) + ')' |
|
return '{user} has blocked {title}{fls} for {duration}{expiry} | {parsedcomment}'.format(**entry) |
|
elif logtype == 'upload': |
|
entry['txt'] = 'a new version of ' if entry['logaction'] == 'overwrite' else '' |
|
return '{user} has uploaded {txt}{title} | {parsedcomment}'.format(**entry) |
|
elif logtype == 'move': |
|
entry['destination'] = entry['logparams']['target_title'] |
|
return '{user} has moved {title} to {destination} | {parsedcomment}'.format(**entry) |
|
elif logtype == 'delete': |
|
return '{user} has deleted {title} | {parsedcomment}'.format(**entry) |
|
else: |
|
return processLogEvent(entry['timestamp']) |
|
#return '(Type of change not supported.)' |
|
else: |
|
return '(Type of change not supported.)' |
|
|
|
def playNotification(): |
|
global isNotPlaying |
|
isNotPlaying = False |
|
winsound.PlaySound('New_change.wav', winsound.SND_FILENAME) |
|
isNotPlaying = True |
|
|
|
def write(text): |
|
global f, logfile |
|
if logfile != '': |
|
f.write(text) |
|
f.flush() |
|
|
|
def feedChangedPages(): |
|
global latesttime |
|
global changes |
|
global isNotPlaying |
|
global notificationsOn |
|
global site |
|
global lastActive |
|
global latestdiff, latestpage, latestuser |
|
while True: |
|
sleep(30) |
|
canPlaySound = True |
|
inactive = True |
|
toWrite = '' |
|
response = urllib.request.urlopen('http://{}/api.php?action=query&list=recentchanges&rcdir=newer&rcstart={}&rcprop=ids|flags|user|parsedcomment|timestamp|title|sizes|tags|loginfo&rclimit=500&format=json'.format(site, datetime2utc_time(dateutil.parser.parse(latesttime))+1)).read() |
|
try: |
|
changes = json.loads(response) |
|
except: |
|
clearStatusLine() |
|
print('[WARN]: Received a HTTPResponse: '+str(response)) |
|
print(currentStatus, end='') |
|
continue |
|
for change in changes['query']['recentchanges']: |
|
latesttime = change['timestamp'] |
|
latestpage = "http://{}/{}".format(site, change['title'].replace(' ', '_')) |
|
latestdiff = "http://{}/index.php?title={}&diff={}&oldid={}".format(site, change['title'].replace(' ', '_'), change['revid'], change['old_revid']) if change['type'] == 'edit' else '' |
|
latestuser = "http://{}/User:{}".format(site, change['user'].replace(' ', '_')) |
|
toPrint = '{}: {}'.format(formatDateTime(change['timestamp']), processChange(change)) |
|
if 'move' in change['tags']: change['tags'].remove('move') |
|
if 'delete' in change['tags']: change['tags'].remove('delete') |
|
if change['tags']: |
|
notFirst = False |
|
toPrint += ' | Tags: ' |
|
for tag in change['tags']: |
|
toPrint += (', ' if notFirst else '') + tag |
|
notFirst = True |
|
if inactive: |
|
clearStatusLine() |
|
print(toPrint) |
|
toWrite += '\n' + toPrint |
|
inactive = False |
|
if canPlaySound and isNotPlaying and notificationsOn: |
|
thrSound = Thread(target=playNotification) |
|
thrSound.start() |
|
canPlaySound = False |
|
if inactive: |
|
clearStatusLine() |
|
updateStatus(math.trunc((datetime.datetime.now() - lastActive).total_seconds()//60)) |
|
else: |
|
lastActive = datetime.datetime.now() |
|
clearStatusLine() |
|
updateStatus(0) |
|
write(toWrite) |
|
|
|
def keyDetector(): |
|
global latestdiff, latestpage, latestuser |
|
global notificationsOn, currentStatus |
|
while True: |
|
c = msvcrt.getch() |
|
if (c == b'p') and (latestpage != ''): |
|
webbrowser.open(latestpage) |
|
elif (c == b'd') and (latestdiff != ''): |
|
webbrowser.open(latestdiff) |
|
elif (c == b'u') and (latestuser != ''): |
|
webbrowser.open(latestuser) |
|
elif c == b'n': |
|
notificationsOn = not notificationsOn |
|
clearStatusLine() |
|
print('[INFO]: Notification sounds now set to', ('ON' if notificationsOn else 'OFF') + '.') |
|
print(currentStatus, end='') |
|
elif c == b'x': |
|
os._exit(0) |
|
|
|
print('Starting feed threads...') |
|
thr1 = Thread(target=feedChangedPages) |
|
thr1.start() |
|
thr2 = Thread(target=keyDetector) |
|
thr2.start() |
|
lastActive = datetime.datetime.now() |
|
print("""\ |
|
Done. The feed messages start from here. Enjoy! |
|
PRO TIP: Press N to toggle notifications on or off. Press X to exit. |
|
PRO TIP: Press D to view the latest change's diff; P to visit the changed page; U to visit changer's page. |
|
---------------------------------------------------------------------------------------------------------- |
|
{}h{}:{} | {}/{}/{}: Started watching.""".format(str(lastActive.hour), ('0' if lastActive.minute < 10 else '') + str(lastActive.minute), ('0' if lastActive.second < 10 else '') + str(lastActive.second), str(lastActive.day), str(lastActive.month), str(lastActive.year))) |
|
updateStatus(0) |