Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Commons Signbot code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# DUAL LICENSED: You are free to choose either or both of below licenses:
#
# 1.
#
# Published by zhuyifei1999 (https://wikitech.wikimedia.org/wiki/User:Zhuyifei1999)
# under the terms of Creative Commons Attribution-ShareAlike 3.0 Unported (CC BY-SA 3.0)
# https://creativecommons.org/licenses/by-sa/3.0/
#
# 2.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General License for more details.
#
# You should have received a copy of the GNU General License
# along with self program. If not, see <http://www.gnu.org/licenses/>
#
from __future__ import unicode_literals
import hashlib
import random
import re
import sys
import threading
import time
import traceback
import pywikibot
from pywikibot.diff import PatchManager
from redis import Redis
from redisconfig import KEYSIGN
# from media-dubiety
from threads import SSEClient, ThreadPool
SITE = pywikibot.Site(user='SignBot')
SITE.login()
REDIS = Redis(host='tools-redis')
limit = None
threads = []
state = type(str('State'), (), {
'useroptin': None,
'useroptout': None,
'excluderegex': None,
})()
# https://gerrit.wikimedia.org/r/#/c/pywikibot/core/+/525179/
def monkey_patch():
from pywikibot.site import PageInUse
def lock_page(self, page, block=True):
"""
Lock page for writing. Must be called before writing any page.
We don't want different threads trying to write to the same page
at the same time, even to different sections.
@param page: the page to be locked
@type page: pywikibot.Page
@param block: if true, wait until the page is available to be locked;
otherwise, raise an exception if page can't be locked
"""
title = page.title(with_section=False)
self._pagemutex.acquire()
try:
while title in self._locked_pages:
if not block:
raise PageInUse(title)
# The mutex must be released so that page can be unlocked
self._pagemutex.release()
time.sleep(.25)
self._pagemutex.acquire()
self._locked_pages.append(title)
finally:
# time.sleep may raise an exception from signal handler (eg:
# KeyboardInterrupt) while the lock is released, and there is no
# reason to acquire the lock again given that our caller will
# receive the exception. The state of the lock is therefore
# undefined at the point of this finally block.
try:
self._pagemutex.release()
except RuntimeError:
pass
SITE.__class__.lock_page = lock_page
monkey_patch()
def chance(c):
return random.random() < c
def get_tags(event):
req = SITE._simple_request(
action='query',
prop='revisions',
titles=event['title'],
rvprop='tags',
rvstartid=event['revision']['new'],
rvendid=event['revision']['new'],
rvlimit=1
)
try:
res = req.submit()
except Exception as e:
pywikibot.exception(e)
return []
else:
try:
p = res['query']['pages']
r = p[p.keys()[0]]['revisions']
return r[0]['tags']
except KeyError:
return []
def checknotify(user):
if user.isAnonymous():
return False
reset = int(time.time()) + 86400
key = KEYSIGN + ':'
key += hashlib.md5(user.username.encode('utf-8')).hexdigest()
p = REDIS.pipeline()
p.incr(key)
p.expireat(key, reset + 10)
return p.execute()[0] >= 3
def get_signature(event, tosignstr, user):
p = ''
if tosignstr[-1] != ' ':
p = ' '
timestamp = pywikibot.Timestamp.utcfromtimestamp(
event['timestamp']).strftime('%H:%M, %-d %B %Y')
return p + '{{%s|%s|%s}}' % (
'unsignedIP2' if user.isAnonymous() else 'unsigned2',
timestamp,
user.username
)
def is_signed(user, tosignstr):
for wikilink in pywikibot.link_regex.finditer(
pywikibot.textlib.removeDisabledParts(tosignstr)):
if not wikilink.group('title').strip():
continue
try:
link = pywikibot.Link(wikilink.group('title'),
source=SITE)
link.parse()
except pywikibot.Error:
continue
if user.isAnonymous():
if link.namespace != -1:
continue
if link.title != 'Contributions/' + user.username:
continue
else:
if link.namespace not in [2, 3]:
continue
if link.title != user.username:
continue
return True
return False
def is_comment(line):
# remove non-functional parts and categories
tempstr = re.sub(r'\[\[[Cc]ategory:[^\]]+\]\]', '',
pywikibot.textlib.removeDisabledParts(line)).strip()
# not empty
if not tempstr:
return False
# not heading
if tempstr.startswith('=') and tempstr.endswith('='):
return False
# not table/template
if (
tempstr.startswith('|') or
tempstr.startswith('{|') or
tempstr.endswith('|')
):
return False
# not horzontal line
if tempstr.startswith('----'):
return False
# not magic words
if re.match(r'^__[A-Z]+__$', tempstr):
return False
return True
def is_optout(user):
# 0.25 chance of updating list
if (
state.useroptin is None or
state.useroptout is None or
chance(0.25)
):
state.useroptin = list(
pywikibot.Page(SITE, 'Template:YesAutosign')
.getReferences(onlyTemplateInclusion=True))
state.useroptout = list(
pywikibot.Page(SITE, 'Template:NoAutosign')
.getReferences(onlyTemplateInclusion=True))
# Check for opt-in {{YesAutosign}} -> False
if user in state.useroptin:
return False
# Check for opt-out {{NoAutosign}} -> True
if user in state.useroptout:
return True
# Check for 800 user edits -> False
# -> True
return user.editCount(force=chance(0.25)) > 800
def is_discussion(page):
# TODO: sandbox
# TODO: opt-in
# __NEWSECTIONLINK__ -> True
if 'newsectionlink' in page.properties():
return True
if page.title().startswith('Commons:Deletion requests/'):
if re.match(r'Commons:Deletion requests/[0-9/]*$', page.title()):
return False
if '{{Commons:Deletion requests/' in page.text:
return False
return True
return False
def match_exclude_regex(line):
# 0.05 chance of updating list
if state.excluderegex is None or chance(0.05):
# We do not directly assign to state.excluderegex right
# now to avoid issues with multi-threading
lst = []
repage = pywikibot.Page(SITE, 'User:SignBot/exclude_regex')
for line in repage.get(force=True).split('\n'):
line = line.strip()
if line and not line.startswith('#'):
lst.append(re.compile(line, re.I))
state.excluderegex = lst
line = line.replace('_', ' ')
for regex in state.excluderegex:
reobj = regex.search(line)
if reobj is not None:
return reobj.group(0)
return None
def userlink(user):
if user.isAnonymous():
return '[[Special:Contributions/%s|%s]]' % (
user.username, user.username)
else:
return '[[User:%s|%s]]' % (user.username, user.username)
def handler(event):
if (
event['wiki'] != SITE.dbName() or
event['bot'] or
not (event['namespace'] == 4 or event['namespace'] % 2 == 1) or
event['type'] not in {'edit', 'new'} or
'!nosign!' in event['comment']
):
return
page = pywikibot.Page(SITE, event['title'], ns=event['namespace'])
def output(info):
pywikibot.output('%s: %s' % (page, info))
output('Handling')
if page.isRedirectPage():
output('Redirect')
return
if page.namespace() == 4:
# Project pages needs attention (__NEWSECTIONLINK__)
if not is_discussion(page):
output('Not a discussion')
return
if {'mw-undo', 'mw-rollback'}.intersection(get_tags(event)):
output('undo / rollback')
return
user = pywikibot.User(SITE, event['user'])
if is_optout(user):
output('%s opted-out' % user)
return
# diff-reading.
if event['type'] == 'new':
old_text = ''
else:
old_text = page.getOldVersion(event['revision']['old'])
new_text = page.getOldVersion(event['revision']['new'])
if '{{speedy' in new_text.lower():
output('{{speedy -- ignored')
return
diff = PatchManager(old_text.split('\n') if old_text else [],
new_text.split('\n'),
by_letter=True)
diff.print_hunks()
tosignstr = False
tosignnum = False
for block in diff.blocks:
if block[0] < 0:
continue
hunk = diff.hunks[block[0]]
group = hunk.group
for tag, i1, i2, j1, j2 in group:
if tag == 'insert':
for j in range(j1, j2):
line = hunk.b[j]
if (
page == user.getUserTalkPage() or
page.title().startswith(
user.getUserTalkPage().title() + '/')
):
if '{{' in line.lower():
output('User adding templates to their '
'own talk page -- ignored')
return
excluderegextest = match_exclude_regex(line)
if excluderegextest is not None:
output('%s -- ignored' % excluderegextest)
return
if is_comment(line):
tosignnum = j
tosignstr = line
if is_signed(user, tosignstr):
output('Signed')
return
if tosignstr is False:
output('No inserts')
return
if is_signed(user, tosignstr):
output('Signed')
return
# Frequent page list not implemented
# if not isFreqpage(page):
output('Waiting')
time.sleep(60)
currenttext = page.get(force=True)
savetext = currenttext.split('\n')
sig = get_signature(event, tosignstr, user)
if savetext[tosignnum] == tosignstr:
savetext[tosignnum] += sig
elif savetext.count(tosignstr) == 1:
savetext[savetext.index(tosignstr)] += \
get_signature(event, tosignstr, user)
else:
output('Line no longer found, probably signed')
return
summary = "Signing comment by %s - '%s'" % (
userlink(user), event['comment'])
page.text = '\n'.join(savetext)
if page.text != currenttext:
pywikibot.output('\n\n>>> \03{lightpurple}%s\03{default} <<<'
% page.title(asLink=True))
pywikibot.showDiff(currenttext, page.text)
page.save(summary)
global limit
if limit:
limit -= 1
if not limit:
map(lambda thread: thread.stop(), threads)
# {{subst:Please sign}} -- ignore {{bots}}
if checknotify(user):
output('Notifying %s' % user)
talk = user.getUserTalkPage()
if talk.isRedirectPage():
talk = talk.getRedirectTarget()
if talk.text:
talk.text += '\n\n'
talk.text += '{{subst:Please sign}} --~~~~'
talk.save('Added {{subst:[[Template:Please sign|Please sign]]}} note.',
minor=False, force=True)
def main():
pywikibot.handleArgs()
pool = ThreadPool(16)
sse = SSEClient(lambda event: pool.process(lambda: handler(event)))
threads[:] = pool, sse
map(lambda thread: thread.start(), threads)
try:
while all(thread.isAlive() for thread in threads):
time.sleep(1)
except BaseException:
traceback.print_exc()
sys.exit(1)
finally:
map(lambda thread: thread.stop(), threads)
for thread in threading.enumerate():
if thread.daemon:
pywikibot.output('Abandoning daemon thread %s' % thread.name)
map(lambda thread: thread.join(), threads)
if __name__ == '__main__':
try:
main()
finally:
pywikibot.stopme()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.