Skip to content

Instantly share code, notes, and snippets.

@leduyquang753
Last active October 4, 2019 08:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leduyquang753/2ac47cfa95ec339322a070699867c39a to your computer and use it in GitHub Desktop.
Save leduyquang753/2ac47cfa95ec339322a070699867c39a to your computer and use it in GitHub Desktop.
A Python script that tracks changes of a MediaWiki site and prints them to a console.

MediaWiki watchdog

This Gist contains a Python script that will connect to a MediaWiki site of your choice and prints the changes every 30 seconds. It also has a notification sound that will be played when a change happens.

The script requires these external modules:

  • bs4 (BeautifulSoup)
  • colorama
  • dateutil
  • pytz

Notes:

  • This script is made for Windows only and uses Windows-specific modules. If you are using another OS, you will have to port it over.
  • Before running, if you want to use notification sounds, put a WAV file named "New_change.wav" in the same directory of the script. It will be played whenever a change is printed to the console.

Command-line arguments

--log <file>: Prints the log to the file specified. Alias: -l.

--alternativeTitle: Do not show inactive time in the console title, but display it at the current line in the console instead. Alias -a.

print('Importing necessary libraries...')
from bs4 import BeautifulSoup
from threading import Thread
from time import sleep
import datetime
import msvcrt
import webbrowser
import json
import urllib.request
import dateutil.parser
from pytz import timezone
import calendar
import pytz
import time
import winsound
import ctypes
import argparse
import codecs
import colorama
from colorama import Fore, Style
import re
import os
from http.client import HTTPResponse
import math
# Change this to the domain of a MediaWiki site at your choice.
site = 'minecraft.gamepedia.com'
ctypes.windll.kernel32.SetConsoleTitleW('MediaWiki watchdog – {}'.format(site))
print('Initializing...')
colorama.init()
parser = argparse.ArgumentParser(description='Watches a MediaWiki site.')
parser.add_argument('-l','--log', default='', help='Writes the changes into a log file in additional to printing to the console.')
parser.add_argument('-a','--alternativeTitle', action='store_true', help='Makes the title bar static and displays the status at the lowermost console line instead.')
parsed = parser.parse_args()
logfile = parsed.log
statusLength = 0
currentStatus = ''
if parsed.alternativeTitle:
def updateStatus(inactiveTime):
global statusLength, currentStatus
currentStatus = '> Active' if inactiveTime == 0 else '> Inactive for {}'.format(formatInactiveTime(inactiveTime))
print(currentStatus, end='')
statusLength = len(currentStatus)
def clearStatusLine():
print('\b'*statusLength, end='')
else:
def updateStatus(inactiveTime):
ctypes.windll.kernel32.SetConsoleTitleW(('MediaWiki watchdog – {}'+('' if inactiveTime == 0 else ' ({} inactive)')).format(site,formatInactiveTime(inactiveTime)))
def clearStatusLine():
pass
f = ''
if logfile != '':
parsed = datetime.datetime.now()
dic = {
'date': str(parsed.day),
'month': str(parsed.month),
'year': str(parsed.year),
'hour': str(parsed.hour)
}
if parsed.minute < 10:
dic['minute'] = '0' + str(parsed.minute)
else:
dic['minute'] = str(parsed.minute)
if parsed.second < 10:
dic['second'] = '0' + str(parsed.second)
else:
dic['second'] = str(parsed.second)
now = '{hour}h{minute}:{second} | {date}/{month}/{year}'.format(**dic)
f = codecs.open(logfile, 'w', 'utf-8')
f.write("""\
Wikipedia watchdog - Site: {}
Start time: {}
------------------------------------------------------------------------""".format(site, now))
f.flush()
latesttime = json.loads(urllib.request.urlopen('http://{}/api.php?action=query&list=recentchanges&rclimit=1&rcprop=timestamp&format=json'.format(site)).read())['query']['recentchanges'][0]['timestamp']
latestdiff = ''
latestpage = ''
latestuser = ''
isNotPlaying = True
notificationsOn = True
lastActive = datetime.datetime.now()
def formatDateTime(dateTimeIn):
parsed = dateutil.parser.parse(dateTimeIn)
parsed += datetime.timedelta(hours=7) # Change the hours to your offset from UTC.
dic = {
'date': str(parsed.day),
'month': str(parsed.month),
'year': str(parsed.year),
'hour': str(parsed.hour)
}
if parsed.minute < 10:
dic['minute'] = '0' + str(parsed.minute)
else:
dic['minute'] = str(parsed.minute)
if parsed.second < 10:
dic['second'] = '0' + str(parsed.second)
else:
dic['second'] = str(parsed.second)
return '{hour}h{minute}:{second} | {date}/{month}/{year}'.format(**dic)
def formatExpiry(expiryIn):
if expiryIn == 'infinite':
return 'Never expires.'
else:
return 'expires at {}'.format(formatDateTime(expiryIn))
def formatInactiveTime(totalMinutes):
days = totalMinutes // 1440
hours = totalMinutes // 60 % 24
minutes = totalMinutes % 60
daysString = (str(days) + 'd') if days > 0 else ''
hoursString = (str(hours) + 'h') if hours > 0 else ''
minutesString = (('0' if minutes < 10 else '') + str(minutes)) if hours > 0 else (str(minutes) + "'")
return '{}{}{}'.format(daysString, hoursString, minutesString)
def datetime2utc_time(datetime):
utc_datetime = datetime.astimezone(timezone('utc')).replace(tzinfo=None)
utc_timetuple = utc_datetime.timetuple()
utc_time = calendar.timegm(utc_timetuple) + datetime.microsecond / 1E6
return utc_time
def processLogEvent(timestamp):
global site
response = urllib.request.urlopen("https://{site}/api.php?action=query&list=logevents&lestart={ts}&leend={ts}&format=json&lelimit=500".format(site=site, ts=timestamp))
try:
logEvent = json.loads(response)['logevents'][0]
except:
return '[WARN]: Received a HTTPResponse: ' + str(response)
type = logEvent['type']
comment = logEvent['comment']
if comment == '':
comment = '(No description provided.)'
if type == 'protect':
action = logEvent['action']
if action == 'protect':
res = '{} protected {}'.format(logEvent['user'], logEvent['title'])
for entry in logEvent['parameters']['details']:
res += '. {}: {}; {}'.format(entry['type'].capitalize(), entry['level'], formatExpiry(entry['expiry']))
return res + ' | ' + comment
elif action == 'unprotect':
return '{} unprotected {} | {}'.format(logEvent['user'], logEvent['title'], comment)
elif action == 'modify':
res = '{} modified protection of {}'.format(logEvent['user'], logEvent['title'])
for entry in logEvent['parameters']['details']:
res += '. {}: {}; {}'.format(entry['type'].capitalize(), entry['level'], formatExpiry(entry['expiry']))
return res + ' | ' + comment
else:
return '(A protection action. Type not supported.)'
else:
return '(An action. Type not supported.)'
def processChange(entry):
if 'flags' in entry:
if 'bot' in entry['flags']:
return ''
comment = re.sub('\u200e', ' ', BeautifulSoup(re.sub('</span> ', '<span>', entry['parsedcomment']), 'html.parser').text)
if comment == '':
comment = '(No description provided.)'
entry['parsedcomment'] = comment
type = entry['type']
if type == 'new':
return ('{user} has created {title} (' + Fore.GREEN + Style.BRIGHT + '+{newlen}' + Fore.RESET + Style.NORMAL + ') | {parsedcomment}').format(**entry)
elif type == 'categorize':
return '[' + entry['title'] + "] " + comment
elif type == 'edit':
try:
entry['mi'] = ', minor' if 'minor' in entry else ''
sizechange = entry['newlen'] - entry['oldlen']
if sizechange > 0:
sizechange = Fore.GREEN + Style.BRIGHT + '+' + str(sizechange) + Fore.RESET + Style.NORMAL
elif sizechange == 0:
sizechange = Fore.YELLOW + Style.BRIGHT + '0' + Fore.RESET + Style.NORMAL
else:
sizechange = Fore.RED + Style.BRIGHT + str(sizechange) + Fore.RESET + Style.NORMAL
entry['sizechange'] = sizechange
if 'mw-undo' in entry['tags']:
entry['tags'].remove('mw-undo')
return '{user} has undone {title} ({sizechange}{mi}) | {parsedcomment}'.format(**entry)
elif 'mw-rollback' in entry['tags']:
entry['tags'].remove('mw-rollback')
return '{user} has rolled back {title} ({sizechange}{mi}) | {parsedcomment}'.format(**entry)
else:
return '{user} has edited {title} ({sizechange}{mi}) | {parsedcomment}'.format(**entry)
except:
entry['mi'] = ', minor' if 'minor' in entry else ''
return '{user} has edited {title}{mi} | {parsedcomment}'.format(**entry)
elif type == 'log':
logtype = entry['logtype']
if logtype == 'block':
if 'flags' in entry['logparams']:
fl = ''
second = False
for stri in entry['logparams']['flags']:
if second:
fl += ', '
else:
second = True
fl += stri
entry['fls'] = ' (flags: ' + fl + ')'
else:
entry['fls'] = ''
entry['title'] = entry['title'][5:]
entry['duration'] = entry['logparams']['duration']
if 'expiry' in entry['logparams']:
entry['expiry'] = ' (expires ' + formatDateTime(entry['logparams']['expiry']) + ')'
return '{user} has blocked {title}{fls} for {duration}{expiry} | {parsedcomment}'.format(**entry)
elif logtype == 'upload':
entry['txt'] = 'a new version of ' if entry['logaction'] == 'overwrite' else ''
return '{user} has uploaded {txt}{title} | {parsedcomment}'.format(**entry)
elif logtype == 'move':
entry['destination'] = entry['logparams']['target_title']
return '{user} has moved {title} to {destination} | {parsedcomment}'.format(**entry)
elif logtype == 'delete':
return '{user} has deleted {title} | {parsedcomment}'.format(**entry)
else:
return processLogEvent(entry['timestamp'])
#return '(Type of change not supported.)'
else:
return '(Type of change not supported.)'
def playNotification():
global isNotPlaying
isNotPlaying = False
winsound.PlaySound('New_change.wav', winsound.SND_FILENAME)
isNotPlaying = True
def write(text):
global f, logfile
if logfile != '':
f.write(text)
f.flush()
def feedChangedPages():
global latesttime
global changes
global isNotPlaying
global notificationsOn
global site
global lastActive
global latestdiff, latestpage, latestuser
while True:
sleep(30)
canPlaySound = True
inactive = True
toWrite = ''
response = urllib.request.urlopen('http://{}/api.php?action=query&list=recentchanges&rcdir=newer&rcstart={}&rcprop=ids|flags|user|parsedcomment|timestamp|title|sizes|tags|loginfo&rclimit=500&format=json'.format(site, datetime2utc_time(dateutil.parser.parse(latesttime))+1)).read()
try:
changes = json.loads(response)
except:
clearStatusLine()
print('[WARN]: Received a HTTPResponse: '+str(response))
print(currentStatus, end='')
continue
for change in changes['query']['recentchanges']:
latesttime = change['timestamp']
latestpage = "http://{}/{}".format(site, change['title'].replace(' ', '_'))
latestdiff = "http://{}/index.php?title={}&diff={}&oldid={}".format(site, change['title'].replace(' ', '_'), change['revid'], change['old_revid']) if change['type'] == 'edit' else ''
latestuser = "http://{}/User:{}".format(site, change['user'].replace(' ', '_'))
toPrint = '{}: {}'.format(formatDateTime(change['timestamp']), processChange(change))
if 'move' in change['tags']: change['tags'].remove('move')
if 'delete' in change['tags']: change['tags'].remove('delete')
if change['tags']:
notFirst = False
toPrint += ' | Tags: '
for tag in change['tags']:
toPrint += (', ' if notFirst else '') + tag
notFirst = True
if inactive:
clearStatusLine()
print(toPrint)
toWrite += '\n' + toPrint
inactive = False
if canPlaySound and isNotPlaying and notificationsOn:
thrSound = Thread(target=playNotification)
thrSound.start()
canPlaySound = False
if inactive:
clearStatusLine()
updateStatus(math.trunc((datetime.datetime.now() - lastActive).total_seconds()//60))
else:
lastActive = datetime.datetime.now()
clearStatusLine()
updateStatus(0)
write(toWrite)
def keyDetector():
global latestdiff, latestpage, latestuser
global notificationsOn, currentStatus
while True:
c = msvcrt.getch()
if (c == b'p') and (latestpage != ''):
webbrowser.open(latestpage)
elif (c == b'd') and (latestdiff != ''):
webbrowser.open(latestdiff)
elif (c == b'u') and (latestuser != ''):
webbrowser.open(latestuser)
elif c == b'n':
notificationsOn = not notificationsOn
clearStatusLine()
print('[INFO]: Notification sounds now set to', ('ON' if notificationsOn else 'OFF') + '.')
print(currentStatus, end='')
elif c == b'x':
os._exit(0)
print('Starting feed threads...')
thr1 = Thread(target=feedChangedPages)
thr1.start()
thr2 = Thread(target=keyDetector)
thr2.start()
lastActive = datetime.datetime.now()
print("""\
Done. The feed messages start from here. Enjoy!
PRO TIP: Press N to toggle notifications on or off. Press X to exit.
PRO TIP: Press D to view the latest change's diff; P to visit the changed page; U to visit changer's page.
----------------------------------------------------------------------------------------------------------
{}h{}:{} | {}/{}/{}: Started watching.""".format(str(lastActive.hour), ('0' if lastActive.minute < 10 else '') + str(lastActive.minute), ('0' if lastActive.second < 10 else '') + str(lastActive.second), str(lastActive.day), str(lastActive.month), str(lastActive.year)))
updateStatus(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment