-
-
Save ninjastrikers/9a6bcda32d3de3756b35b706dd965634 to your computer and use it in GitHub Desktop.
Commons Signbot code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
# | |
# DUAL LICENSED: You are free to choose either or both of below licenses: | |
# | |
# 1. | |
# | |
# Published by zhuyifei1999 (https://wikitech.wikimedia.org/wiki/User:Zhuyifei1999) | |
# under the terms of Creative Commons Attribution-ShareAlike 3.0 Unported (CC BY-SA 3.0) | |
# https://creativecommons.org/licenses/by-sa/3.0/ | |
# | |
# 2. | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General License for more details. | |
# | |
# You should have received a copy of the GNU General License | |
# along with self program. If not, see <http://www.gnu.org/licenses/> | |
# | |
from __future__ import unicode_literals | |
import hashlib | |
import random | |
import re | |
import sys | |
import threading | |
import time | |
import traceback | |
import pywikibot | |
from pywikibot.diff import PatchManager | |
from redis import Redis | |
from redisconfig import KEYSIGN | |
# from media-dubiety | |
from threads import SSEClient, ThreadPool | |
SITE = pywikibot.Site(user='NjsBot') | |
SITE.login() | |
REDIS = Redis(host='tools-redis') | |
limit = None | |
threads = [] | |
state = type(str('State'), (), { | |
'useroptin': None, | |
'useroptout': None, | |
'excluderegex': None, | |
})() | |
# https://gerrit.wikimedia.org/r/#/c/pywikibot/core/+/525179/ | |
def monkey_patch(): | |
from pywikibot.site import PageInUse | |
def lock_page(self, page, block=True): | |
""" | |
Lock page for writing. Must be called before writing any page. | |
We don't want different threads trying to write to the same page | |
at the same time, even to different sections. | |
@param page: the page to be locked | |
@type page: pywikibot.Page | |
@param block: if true, wait until the page is available to be locked; | |
otherwise, raise an exception if page can't be locked | |
""" | |
title = page.title(with_section=False) | |
self._pagemutex.acquire() | |
try: | |
while title in self._locked_pages: | |
if not block: | |
raise PageInUse(title) | |
# The mutex must be released so that page can be unlocked | |
self._pagemutex.release() | |
time.sleep(.25) | |
self._pagemutex.acquire() | |
self._locked_pages.append(title) | |
finally: | |
# time.sleep may raise an exception from signal handler (eg: | |
# KeyboardInterrupt) while the lock is released, and there is no | |
# reason to acquire the lock again given that our caller will | |
# receive the exception. The state of the lock is therefore | |
# undefined at the point of this finally block. | |
try: | |
self._pagemutex.release() | |
except RuntimeError: | |
pass | |
SITE.__class__.lock_page = lock_page | |
monkey_patch() | |
def chance(c): | |
return random.random() < c | |
def get_tags(event): | |
req = SITE._simple_request( | |
action='query', | |
prop='revisions', | |
titles=event['title'], | |
rvprop='tags', | |
rvstartid=event['revision']['new'], | |
rvendid=event['revision']['new'], | |
rvlimit=1 | |
) | |
try: | |
res = req.submit() | |
except Exception as e: | |
pywikibot.exception(e) | |
return [] | |
else: | |
try: | |
p = res['query']['pages'] | |
r = p[next(iter(p.keys()))]['revisions'] | |
return r[0]['tags'] | |
except KeyError: | |
return [] | |
def checknotify(user): | |
if user.isAnonymous(): | |
return False | |
reset = int(time.time()) + 86400 | |
key = KEYSIGN + ':' | |
key += hashlib.md5(user.username.encode('utf-8')).hexdigest() | |
p = REDIS.pipeline() | |
p.incr(key) | |
p.expireat(key, reset + 10) | |
return p.execute()[0] >= 3 | |
def get_signature(event, tosignstr, user): | |
p = '' | |
if tosignstr[-1] != ' ': | |
p = ' ' | |
timestamp = pywikibot.Timestamp.utcfromtimestamp( | |
event['timestamp']).strftime('%H:%M, %-d %B %Y') | |
return p + '{{%s|%s|%s}}' % ( | |
'unsignedIP2' if user.isAnonymous() else 'unsigned2', | |
timestamp, | |
user.username | |
) | |
def is_signed(user, tosignstr): | |
for wikilink in pywikibot.link_regex.finditer( | |
pywikibot.textlib.removeDisabledParts(tosignstr)): | |
if not wikilink.group('title').strip(): | |
continue | |
try: | |
link = pywikibot.Link(wikilink.group('title'), | |
source=SITE) | |
link.parse() | |
except pywikibot.Error: | |
continue | |
if user.isAnonymous(): | |
if link.namespace != -1: | |
continue | |
if link.title != 'Contributions/' + user.username: | |
continue | |
else: | |
if link.namespace not in [2, 3]: | |
continue | |
if link.title != user.username: | |
continue | |
return True | |
return False | |
def is_comment(line): | |
# remove non-functional parts and categories | |
tempstr = re.sub(r'\[\[[Cc]ategory:[^\]]+\]\]', '', | |
pywikibot.textlib.removeDisabledParts(line)).strip() | |
# not empty | |
if not tempstr: | |
return False | |
# not heading | |
if tempstr.startswith('=') and tempstr.endswith('='): | |
return False | |
# not table/template | |
if ( | |
tempstr.startswith('|') or | |
tempstr.startswith('{|') or | |
tempstr.endswith('|') or | |
tempstr.count('{{') > tempstr.count('}}') | |
): | |
return False | |
# not horzontal line | |
if tempstr.startswith('----'): | |
return False | |
# not magic words | |
if re.match(r'^__[A-Z]+__$', tempstr): | |
return False | |
return True | |
def is_optout(user): | |
# 0.25 chance of updating list | |
if ( | |
state.useroptin is None or | |
state.useroptout is None or | |
chance(0.25) | |
): | |
state.useroptin = list( | |
pywikibot.Page(SITE, 'Template:YesAutosign') | |
.getReferences(onlyTemplateInclusion=True)) | |
state.useroptout = list( | |
pywikibot.Page(SITE, 'Template:NoAutosign') | |
.getReferences(onlyTemplateInclusion=True)) | |
# Check for opt-in {{YesAutosign}} -> False | |
if user in state.useroptin: | |
return False | |
# Check for opt-out {{NoAutosign}} -> True | |
if user in state.useroptout: | |
return True | |
# Check for 800 user edits -> False | |
# -> True | |
return user.editCount(force=chance(0.25)) > 800 | |
def is_discussion(page): | |
# TODO: sandbox | |
# TODO: opt-in | |
# __NEWSECTIONLINK__ -> True | |
if 'newsectionlink' in page.properties(): | |
return True | |
if page.title().startswith('Wikipedia:ဖျက်ပစ်ရမည့်ဆောင်းပါးများ/'): | |
if re.match(r'Wikipedia:ဖျက်ပစ်ရမည့်ဆောင်းပါးများ/[0-9/]*$', page.title()): | |
return False | |
if '{{Wikipedia:ဖျက်ပစ်ရမည့်ဆောင်းပါးများ/' in page.text: | |
return False | |
return True | |
return False | |
def match_exclude_regex(line): | |
# 0.05 chance of updating list | |
if state.excluderegex is None or chance(0.05): | |
# We do not directly assign to state.excluderegex right | |
# now to avoid issues with multi-threading | |
lst = [] | |
repage = pywikibot.Page(SITE, 'User:NjsBot/exclude_regex') | |
for line in repage.get(force=True).split('\n'): | |
line = line.strip() | |
if line and not line.startswith('#'): | |
lst.append(re.compile(line, re.I)) | |
state.excluderegex = lst | |
line = line.replace('_', ' ') | |
for regex in state.excluderegex: | |
reobj = regex.search(line) | |
if reobj is not None: | |
return reobj.group(0) | |
return None | |
def userlink(user): | |
if user.isAnonymous(): | |
return '[[Special:Contributions/%s|%s]]' % ( | |
user.username, user.username) | |
else: | |
return '[[User:%s|%s]]' % (user.username, user.username) | |
def handler(event): | |
if ( | |
event['wiki'] != SITE.dbName() or | |
event['bot'] or | |
not (event['namespace'] == 4 or event['namespace'] % 2 == 1) or | |
event['type'] not in {'edit', 'new'} or | |
'!nosign!' in event['comment'] | |
): | |
return | |
page = pywikibot.Page(SITE, event['title'], ns=event['namespace']) | |
def output(info): | |
pywikibot.output('%s: %s' % (page, info)) | |
output('Handling') | |
if page.isRedirectPage(): | |
output('Redirect') | |
return | |
if page.namespace() == 4: | |
# Project pages needs attention (__NEWSECTIONLINK__) | |
if not is_discussion(page): | |
output('Not a discussion') | |
return | |
if {'mw-undo', 'mw-rollback'}.intersection(get_tags(event)): | |
output('undo / rollback') | |
return | |
user = pywikibot.User(SITE, event['user']) | |
if is_optout(user): | |
output('%s opted-out' % user) | |
return | |
# diff-reading. | |
if event['type'] == 'new': | |
old_text = '' | |
else: | |
old_text = page.getOldVersion(event['revision']['old']) | |
new_text = page.getOldVersion(event['revision']['new']) | |
if '{{speedy' in new_text.lower(): | |
output('{{speedy -- ignored') | |
return | |
diff = PatchManager(old_text.split('\n') if old_text else [], | |
new_text.split('\n'), | |
by_letter=True) | |
diff.print_hunks() | |
tosignstr = False | |
tosignnum = False | |
for block in diff.blocks: | |
if block[0] < 0: | |
continue | |
hunk = diff.hunks[block[0]] | |
group = hunk.group | |
for tag, i1, i2, j1, j2 in group: | |
if tag == 'insert': | |
for j in range(j1, j2): | |
line = hunk.b[j] | |
if ( | |
page == user.getUserTalkPage() or | |
page.title().startswith( | |
user.getUserTalkPage().title() + '/') | |
): | |
if '{{' in line.lower(): | |
output('User adding templates to their ' | |
'own talk page -- ignored') | |
return | |
excluderegextest = match_exclude_regex(line) | |
if excluderegextest is not None: | |
output('%s -- ignored' % excluderegextest) | |
return | |
if is_comment(line): | |
tosignnum = j | |
tosignstr = line | |
if is_signed(user, tosignstr): | |
output('Signed') | |
return | |
if tosignstr is False: | |
output('No inserts') | |
return | |
if is_signed(user, tosignstr): | |
output('Signed') | |
return | |
# Frequent page list not implemented | |
# if not isFreqpage(page): | |
output('Waiting') | |
time.sleep(60) | |
currenttext = page.get(force=True) | |
savetext = currenttext.split('\n') | |
sig = get_signature(event, tosignstr, user) | |
if savetext[tosignnum] == tosignstr: | |
savetext[tosignnum] += sig | |
elif savetext.count(tosignstr) == 1: | |
savetext[savetext.index(tosignstr)] += \ | |
get_signature(event, tosignstr, user) | |
else: | |
output('Line no longer found, probably signed') | |
return | |
summary = "%s - '%s' ၏ ဆွေးနွေးချက်ကို လက်မှတ်ထိုးပေးခြင်း" % ( | |
userlink(user), event['comment']) | |
page.text = '\n'.join(savetext) | |
if page.text != currenttext: | |
pywikibot.output('\n\n>>> \03{lightpurple}%s\03{default} <<<' | |
% page.title(asLink=True)) | |
pywikibot.showDiff(currenttext, page.text) | |
page.save(summary) | |
global limit | |
if limit: | |
limit -= 1 | |
if not limit: | |
map(lambda thread: thread.stop(), threads) | |
# {{subst:Please sign}} -- ignore {{bots}} | |
if checknotify(user): | |
output('Notifying %s' % user) | |
talk = user.getUserTalkPage() | |
if talk.isRedirectPage(): | |
talk = talk.getRedirectTarget() | |
if talk.text: | |
talk.text += '\n\n' | |
talk.text += '{{subst:Please sign}} --~~~~' | |
talk.save('{{subst:[[Template:Please sign|Please sign]]}} မှတ်ချက်ကို ပေါင်းထည့်ခြင်း။', | |
minor=False, force=True) | |
def main(): | |
pywikibot.handleArgs() | |
pool = ThreadPool(16) | |
sse = SSEClient(lambda event: pool.process(lambda: handler(event))) | |
threads[:] = pool, sse | |
[thread.start() for thread in threads] | |
try: | |
while all(thread.isAlive() for thread in threads): | |
time.sleep(1) | |
except BaseException: | |
traceback.print_exc() | |
sys.exit(1) | |
finally: | |
[thread.stop() for thread in threads] | |
for thread in threading.enumerate(): | |
if thread.daemon: | |
pywikibot.output('Abandoning daemon thread %s' % thread.name) | |
[thread.join() for thread in threads] | |
if __name__ == '__main__': | |
try: | |
main() | |
finally: | |
pywikibot.stopme() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment