Skip to content

Instantly share code, notes, and snippets.

@zhuyifei1999
Last active November 14, 2018 15:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zhuyifei1999/25e941d2fc971456120b7c851fb552ba to your computer and use it in GitHub Desktop.
Save zhuyifei1999/25e941d2fc971456120b7c851fb552ba to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
# -*- coding: UTF-8 -*-
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General License for more details.
#
# You should have received a copy of the GNU General License
# along with self program. If not, see <http://www.gnu.org/licenses/>
#
from __future__ import unicode_literals
import re
import sys
import threading
import time
import traceback
import pywikibot
from pywikibot.diff import PatchManager
# from media-dubiety
from threads import SSEClient, ThreadPool
SITE = pywikibot.Site('de', 'wikipedia')
FAMLANGS = SITE.family.langs
limit = 50
threads = []
def get_tags(event):
req = SITE._simple_request(
action='query',
prop='revisions',
titles=event['title'],
rvprop='tags',
rvstartid=event['revision']['new'],
rvendid=event['revision']['new'],
rvlimit=1
)
try:
res = req.submit()
except Exception as e:
pywikibot.exception(e)
return []
else:
try:
p = res['query']['pages']
r = p[p.keys()[0]]['revisions']
return r[0]['tags']
except KeyError:
return []
def main_ns(ns):
return ns - ns % 2
def handler(event):
if (
event['wiki'] != 'dewiki' or
event['bot'] or
event['type'] not in {'edit', 'new'} or
event['namespace'] not in {4, 5} or
event['title'] == 'Wikipedia:Spielwiese' or
{'mw-undo', 'mw-rollback'}.intersection(get_tags(event))
):
return
page = pywikibot.Page(SITE, event['title'], ns=event['namespace'])
if page.isRedirectPage():
return
if event['type'] == 'new':
old_text = ''
else:
old_text = page.getOldVersion(event['revision']['old'])
new_text = page.getOldVersion(event['revision']['new'])
new_nodisabled = pywikibot.textlib.removeDisabledParts(new_text)
diff = PatchManager(old_text.split('\n') if old_text else [],
new_text.split('\n'),
by_letter=True)
# diff = PatchManager(old_text, new_text)
diff.print_hunks()
text_save = new_text.split('\n')
for hunk_index, _, _ in diff.blocks:
if hunk_index < 0:
continue
hunk = diff.hunks[hunk_index]
group = hunk.group
for tag, i1, i2, j1, j2 in group:
if tag not in {'insert', 'replace'}:
continue
for j in range(j1, j2):
line = hunk.b[j]
for wikilink in pywikibot.link_regex.finditer(line):
# interlanguage link
if not wikilink.group('title').strip():
continue
if wikilink.group('title').strip()[0] == ':':
continue
lang = wikilink.group('title').split(':')[0].strip()
if lang not in FAMLANGS:
continue
try:
link = pywikibot.Link(wikilink.group('title'),
source=SITE)
link.parse()
except pywikibot.Error:
continue
if (
link.site == SITE or
link.site.family.name != 'wikipedia' or
link.site.code != lang
):
continue
# either cross-namespace in {0, 4, 10, 14} or has anchor
if not (
main_ns(link.namespace) in ({0, 4, 10, 14} - {
main_ns(page.namespace())}) or
(link.anchor and link.anchor.strip())
):
continue
oldlink = wikilink.group(0)
if oldlink not in new_nodisabled:
continue
newlink = re.sub(r'^\[\[\s*', '[[:', oldlink)
assert text_save[j] == line
text_save[j] = text_save[j].replace(oldlink, newlink)
page.text = '\n'.join(text_save)
if page.text != new_text:
pywikibot.output('\n\n>>> \03{lightpurple}%s\03{default} <<<'
% page.title(asLink=True))
pywikibot.showDiff(new_text, page.text)
page.save('Bot: Interwiki-Link in textlichen Link umgewandelt')
global limit
if limit:
limit -= 1
if not limit:
map(lambda thread: thread.stop(), threads)
def main():
pool = ThreadPool(16)
sse = SSEClient(lambda event: pool.process(lambda: handler(event)))
threads[:] = pool, sse
map(lambda thread: thread.start(), threads)
try:
while all(thread.isAlive() for thread in threads):
time.sleep(1)
except BaseException:
traceback.print_exc()
sys.exit(1)
finally:
map(lambda thread: thread.stop(), threads)
for thread in threading.enumerate():
if thread.daemon:
pywikibot.output('Abandoning daemon thread %s' % thread.name)
map(lambda thread: thread.join(), threads)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment