Instantly share code, notes, and snippets.
Created
January 11, 2021 21:11
-
Star
(0)
0
You must be signed in to star a gist -
Fork
(0)
0
You must be signed in to fork a gist
-
Save eggpi/bf4d6fbdec9c1208cdf77c4791d0e28b to your computer and use it in GitHub Desktop.
collect_hashtags.py with media proof of concept
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install mwapi sseclient | |
import sys | |
import functools | |
import datetime | |
import json | |
import mwapi | |
from sseclient import SSEClient as EventSource | |
import re | |
# From https://github.com/hatnote/hashtag-search/blob/1e02506a732b3e018521c431c4b5c3f3c0618215/common.py | |
EXCLUDED = ('redirect', | |
'weiterleitung', | |
'redirection', | |
'ifexist', | |
'switch', | |
'ifexpr', | |
'if', | |
'rs', | |
'default', | |
'mw') | |
def hashtag_match(comment): | |
# Save some time by discarding this edit if it doesn't have | |
# a hashtag symbol at all | |
if "#" not in comment and "#" not in comment: | |
return None | |
# Now do regex to see if it's a valid hashtag | |
# From https://gist.github.com/mahmoud/237eb20108b5805aed5f | |
hashtag_re = re.compile("(?:^|\s)[##]{1}(\w+)") | |
return hashtag_re.findall(comment) | |
def valid_hashtag(hashtag): | |
not_excluded = hashtag.lower() not in EXCLUDED | |
not_only_numbers = not hashtag.isdigit() | |
not_only_one_character = len(hashtag) > 1 | |
return all([not_excluded, not_only_numbers, not_only_one_character]) | |
def valid_edit(change): | |
# Exclude Wikidata for now, just far too much data | |
project_match = change['meta']['domain'] != "www.wikidata.org" | |
# Excluding bots, mostly because of IABot. Lots of data, not very useful. | |
not_bot = not change['bot'] | |
return all([project_match, not_bot]) | |
@functools.cache | |
def get_wiki_session(domain): | |
return mwapi.Session( | |
'https://{}/'.format(domain), 'hashtags') | |
def query_media_in_revision(session, rev_id): | |
images_args = { | |
'action': 'parse', | |
'prop': 'images', | |
'oldid': rev_id, | |
} | |
try: | |
media_filenames = set(session.get(**images_args)['parse']['images']) | |
except mwapi.errors.APIError as e: | |
if e.code == 'nosuchrevid': | |
pass | |
raise | |
return media_filenames | |
def query_media_types(session, media_filenames): | |
def query_imageinfo(titles, iistart = None): | |
imageinfo_args = { | |
'action': 'query', | |
'prop': 'imageinfo', | |
'titles': '|'.join(titles), | |
'iiprop': 'mediatype', | |
} | |
if iistart is not None: | |
imageinfo_args['iistart'] = iistart | |
return session.get(**imageinfo_args) | |
media_types = {} | |
media_filenames = list(media_filenames) | |
while media_filenames: | |
iistart = None | |
titles = ['File:' + f for f in media_filenames[:50]] | |
while True: | |
result = query_imageinfo(titles, iistart) | |
for m in result['query']['pages'].values(): | |
if 'imageinfo' not in m: | |
# Broken link | |
continue | |
if 'mediatype' not in m['imageinfo'][0]: | |
# Probably filehidden? | |
continue | |
media_types[m['title']] = m['imageinfo'][0]['mediatype'] | |
if 'continue' in result: | |
iistart = result['continue']['iistart'] | |
else: | |
break | |
media_filenames = media_filenames[50:] | |
return media_types | |
url = 'https://stream.wikimedia.org/v2/stream/recentchange' #?since=2021-01-09T23:35:30Z' | |
progress = 0 | |
for event in EventSource( | |
url, | |
# The retry argument sets the delay between retries in milliseconds. | |
# We're setting this to 5 minutes. | |
# There's no way to set the max_retries value with this library, | |
# but since it depends upon requests, which in turn uses urllib3 | |
# by default, we get a default max_retries value of 3. | |
retry=300000, | |
# The timeout argument gets passed to requests.get. | |
# An integer value sets connect (socket connect) and | |
# read (time to first byte / since last byte) timeout values. | |
# A tuple value sets each respective value independently. | |
# https://requests.readthedocs.io/en/latest/user/advanced/#timeouts | |
timeout=(3.05, 30)): | |
progress += 1 | |
if progress % 1000 == 0: | |
print('[{}] {} events processed.'.format( | |
datetime.datetime.now(), progress), file = sys.stderr) | |
if event.event == 'message': | |
try: | |
change = json.loads(event.data) | |
except ValueError: | |
continue | |
hashtag_matches = hashtag_match(change['comment']) | |
if hashtag_matches and valid_edit(change): | |
if 'id' not in change: | |
print("Couldn't find recent changes ID in data. Skipping.") | |
continue | |
if not any(valid_hashtag(h) for h in hashtag_matches): | |
continue | |
wiki = change['wiki'] | |
print('wiki: {}'.format(wiki)) | |
if 'revision' in change: | |
new_rev = change['revision']['new'] | |
print('new: {}'.format(new_rev)) | |
old_rev = change['revision'].get('old', None) | |
print('old: {}'.format(old_rev)) | |
if old_rev is not None: | |
debug_url = '{0}?diff={1}&oldid={2}'.format( | |
change['meta']['uri'], new_rev, old_rev) | |
else: | |
debug_url = '{0}?oldid={1}'.format( | |
change['meta']['uri'], new_rev) | |
print('debug url: ' + debug_url) | |
session = get_wiki_session(change['meta']['domain']) | |
new_media = query_media_in_revision(session, new_rev) | |
old_media = set() | |
if new_media and old_rev is not None: | |
old_media = query_media_in_revision(session, old_rev) | |
added_media = new_media - old_media | |
print('{}, {}'.format(new_media, old_media)) | |
if added_media: | |
added_media_types = query_media_types(session, added_media) | |
print('Revision has added media of type(s) {}: {}'.format( | |
', '.join(added_media_types.values()), added_media)) | |
ts = datetime.datetime.fromtimestamp(change['timestamp']) | |
print('timestamp: {0:%x} {0:%X}'.format(ts)) | |
print('latency: {}'.format(datetime.datetime.now() - ts)) | |
print('....') | |
sys.stdout.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment