Skip to content

Instantly share code, notes, and snippets.

@Ameisenigel
Forked from zhuyifei1999/signbot.py
Last active April 26, 2022 09:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ameisenigel/5974064e46af281e4053644e1c99f42e to your computer and use it in GitHub Desktop.
Save Ameisenigel/5974064e46af281e4053644e1c99f42e to your computer and use it in GitHub Desktop.
Wikidata Signbot code
#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# DUAL LICENSED: You are free to choose either or both of below licenses:
#
# 1.
#
# Published by zhuyifei1999 (https://wikitech.wikimedia.org/wiki/User:Zhuyifei1999)
# under the terms of Creative Commons Attribution-ShareAlike 3.0 Unported (CC BY-SA 3.0)
# https://creativecommons.org/licenses/by-sa/3.0/
#
# 2.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General License for more details.
#
# You should have received a copy of the GNU General License
# along with self program. If not, see <http://www.gnu.org/licenses/>
#
from __future__ import unicode_literals
import hashlib
import random
import re
import sys
import threading
import time
import traceback
import pywikibot
from pywikibot.diff import PatchManager
from redis import Redis
from redisconfig import KEYSIGN
# from media-dubiety
from threads import SSEClient, ThreadPool
SITE = pywikibot.Site(user='AmeisenBot')
SITE.login()
REDIS = Redis(host='tools-redis')
threads = []
state = type(str('State'), (), {
'useroptin': None,
'useroptout': None,
'excluderegex': None,
})()
def chance(c):
return random.random() < c
def get_tags(event):
req = SITE._simple_request(
action='query',
prop='revisions',
titles=event['title'],
rvprop='tags',
rvstartid=event['revision']['new'],
rvendid=event['revision']['new'],
rvlimit=1
)
try:
res = req.submit()
except Exception as e:
pywikibot.exception(e)
return []
else:
try:
p = res['query']['pages']
r = p[next(iter(p.keys()))]['revisions']
return r[0]['tags']
except KeyError:
return []
def locknotify(user, lock=True):
if user.isAnonymous():
return False
reset = int(time.time()) + 86400
key = KEYSIGN + ':' + 'lock' + ':'
key += hashlib.md5(user.username.encode('utf-8')).hexdigest()
p = REDIS.pipeline()
p.exists(key)
if lock:
p.set(key, '1')
p.expireat(key, reset + 10)
else:
p.delete(key)
return p.execute()[0]
def checknotify(user):
if user.isAnonymous():
return False
reset = int(time.time()) + 86400
key = KEYSIGN + ':' + 'counter' + ':'
key += hashlib.md5(user.username.encode('utf-8')).hexdigest()
p = REDIS.pipeline()
p.incr(key)
p.expireat(key, reset + 10)
return p.execute()[0] >= 3
def get_signature(event, tosignstr, user):
p = ''
if tosignstr[-1] != ' ':
p = ' '
timestamp = pywikibot.Timestamp.utcfromtimestamp(
event['timestamp']).strftime('%H:%M, %-d %B %Y')
return p + '{{%s|%s|%s}}' % (
'subst:unsigned2',
timestamp,
user.username
)
def is_signed(user, tosignstr):
for wikilink in pywikibot.link_regex.finditer(
pywikibot.textlib.removeDisabledParts(tosignstr)):
if not wikilink.group('title').strip():
continue
try:
link = pywikibot.Link(wikilink.group('title'),
source=SITE)
link.parse()
except pywikibot.Error:
continue
if user.isAnonymous():
if link.namespace != -1:
continue
if link.title != 'Contributions/' + user.username:
continue
else:
if link.namespace not in [2, 3]:
continue
if link.title != user.username:
continue
return True
return False
def is_comment(line):
# remove non-functional parts and categories
tempstr = re.sub(r'\[\[[Cc]ategory:[^\]]+\]\]', '',
pywikibot.textlib.removeDisabledParts(line)).strip()
# not empty
if not tempstr:
return False
# not heading
if tempstr.startswith('=') and tempstr.endswith('='):
return False
# not table/template
if (
tempstr.startswith('|') or
tempstr.startswith('{|') or
tempstr.endswith('|') or
tempstr.count('{{') > tempstr.count('}}')
):
return False
# not horzontal line
if tempstr.startswith('----'):
return False
# not magic words
if re.match(r'^__[A-Z]+__$', tempstr):
return False
return True
def is_optout(user):
# Check for 800 user edits -> False
# -> True
return user.editCount(force=chance(0.25)) > 800
def is_discussion(page):
# TODO: sandbox
# TODO: opt-in
# __NEWSECTIONLINK__ -> True
if 'newsectionlink' in page.properties():
return True
return False
def match_exclude_regex(line):
# 0.05 chance of updating list
if state.excluderegex is None or chance(0.05):
# We do not directly assign to state.excluderegex right
# now to avoid issues with multi-threading
lst = []
state.excluderegex = lst
line = line.replace('_', ' ')
for regex in state.excluderegex:
reobj = regex.search(line)
if reobj is not None:
return reobj.group(0)
return None
def userlink(user):
if user.isAnonymous():
return '[[Special:Contributions/%s|%s]]' % (
user.username, user.username)
else:
return '[[User:%s|%s]]' % (user.username, user.username)
def handler(event):
if (
event['wiki'] != SITE.dbName() or
event['bot'] or
not (event['namespace'] == 4 or event['namespace'] % 2 == 1) or
event['type'] not in {'edit', 'new'} or
'!nosign!' in event['comment']
):
return
page = pywikibot.Page(SITE, event['title'], ns=event['namespace'])
def output(info):
pywikibot.output('%s: %s' % (page, info))
output('Handling')
if page.isRedirectPage():
output('Redirect')
return
if page.namespace() == 4:
# Project pages needs attention (__NEWSECTIONLINK__)
if not is_discussion(page):
output('Not a discussion')
return
if {'mw-undo', 'mw-rollback'}.intersection(get_tags(event)):
output('undo / rollback')
return
user = pywikibot.User(SITE, event['user'])
if is_optout(user):
output('%s opted-out' % user)
return
# diff-reading.
if event['type'] == 'new':
old_text = ''
else:
old_text = page.getOldVersion(event['revision']['old'])
new_text = page.getOldVersion(event['revision']['new'])
if '{{speedy' in new_text.lower():
output('{{speedy -- ignored')
return
if '{{delete' in new_text.lower():
output('{{delete -- ignored')
return
diff = PatchManager(old_text.split('\n') if old_text else [],
new_text.split('\n'),
by_letter=True)
diff.print_hunks()
tosignstr = False
tosignnum = False
for block in diff.blocks:
if block[0] < 0:
continue
hunk = diff.hunks[block[0]]
group = hunk.group
for tag, i1, i2, j1, j2 in group:
if tag == 'insert':
for j in range(j1, j2):
line = hunk.b[j]
if (
page == user.getUserTalkPage() or
page.title().startswith(
user.getUserTalkPage().title() + '/')
):
if '{{' in line.lower():
output('User adding templates to their '
'own talk page -- ignored')
return
excluderegextest = match_exclude_regex(line)
if excluderegextest is not None:
output('%s -- ignored' % excluderegextest)
return
if is_comment(line):
tosignnum = j
tosignstr = line
if is_signed(user, tosignstr):
output('Signed')
return
if tosignstr is False:
output('No inserts')
return
if is_signed(user, tosignstr):
output('Signed')
return
# Frequent page list not implemented
# if not isFreqpage(page):
pending_notify = locknotify(user, lock=True)
def do_process():
output('Waiting')
currenttext = page.get(force=True)
savetext = currenttext.split('\n')
sig = get_signature(event, tosignstr, user)
if savetext[tosignnum] == tosignstr:
savetext[tosignnum] += sig
elif savetext.count(tosignstr) == 1:
savetext[savetext.index(tosignstr)] += \
get_signature(event, tosignstr, user)
else:
output('Line no longer found, probably signed')
return
summary = "Signing comment by %s - '%s'" % (
userlink(user), event['comment'])
page.text = '\n'.join(savetext)
if page.text != currenttext:
pywikibot.output('\n\n>>> \03{lightpurple}%s\03{default} <<<'
% page.title(asLink=True))
pywikibot.showDiff(currenttext, page.text)
page.save(summary)
# {{subst:Please sign}} -- ignore {{bots}}
if not pending_notify and checknotify(user):
output('Notifying %s' % user)
talk = user.getUserTalkPage()
if talk.isRedirectPage():
talk = talk.getRedirectTarget()
if talk.text:
talk.text += '\n\n'
talk.text += '{{subst:Uw-tilde}} --~~~~'
talk.save('Added {{subst:[[Template:Uw-tilde|Uw-tilde]]}} note.',
minor=False, force=True)
locknotify(user, lock=False)
threading.Thread(target=do_process).start()
def main():
pywikibot.handle_args()
pool = ThreadPool(16)
sse = SSEClient(lambda event: pool.process(lambda: handler(event)))
threads[:] = pool, sse
[thread.start() for thread in threads]
try:
while all(thread.isAlive() for thread in threads):
time.sleep(1)
except BaseException:
traceback.print_exc()
sys.exit(1)
finally:
[thread.stop() for thread in threads]
for thread in threading.enumerate():
if thread.daemon:
pywikibot.output('Abandoning daemon thread %s' % thread.name)
[thread.join() for thread in threads]
if __name__ == '__main__':
try:
main()
finally:
pywikibot.stopme()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment