Last active
June 6, 2020 14:43
-
-
Save enkeboll/1efcb286576142d98c3ce2762e6fb663 to your computer and use it in GitHub Desktop.
Wikipedia Pageview Analyzer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This application that can compute the top 25 pages on Wikipedia for each of the Wikipedia sub-domains | |
import argparse | |
import csv | |
import datetime | |
import gzip | |
import os | |
import sys | |
from collections import defaultdict, namedtuple | |
from heapq import heappush, heappushpop | |
import boto3 | |
import requests | |
from smart_open import open as smart_open | |
from tqdm import tqdm | |
S3_BUCKET = os.getenv('S3_BUCKET') | |
class WikiPageView: | |
dest_bucket = S3_BUCKET | |
def __init__(self, run_datetime: datetime.datetime): | |
self.run_datetime = run_datetime | |
self.dest_key = os.path.join(self.run_datetime.strftime('%Y'), | |
self.run_datetime.strftime('%m'), | |
self.run_datetime.strftime('%d'), | |
f"{self.run_datetime.strftime('%H')}.csv.gz") | |
self.dest_full_path = os.path.join('s3://', | |
self.dest_bucket, | |
self.dest_key) | |
self._blacklist = None | |
self.data = None | |
@property | |
def blacklist(self): | |
if not self._blacklist: | |
self._blacklist = self.get_blacklist() | |
return self._blacklist | |
@staticmethod | |
def get_blacklist(): | |
filename = 'blacklist_domains_and_pages' | |
try: | |
with open(filename, 'r') as f: | |
text = f.read().split('\n') | |
except FileNotFoundError: | |
print('blacklist_domains_and_pages does not exist locally, downloading') | |
url = os.getenv('BLACKLIST_DOMAINS_AND_PAGES_URL') | |
text = requests.get(url).text | |
with open(filename, 'w') as f: | |
f.write(text) | |
text = text.split('\n') | |
blacklist = defaultdict(set) | |
for item in text: | |
if item: | |
k, v = item.split(' ') | |
blacklist[k].add(v) | |
return blacklist | |
def get_page_view_count(self): | |
"""Download the page view counts from wikipedia for the given | |
date/hour from https://dumps.wikimedia.org/other/pageviews/ | |
""" | |
# example: https://dumps.wikimedia.org/other/pageviews/2019/2019-05/pageviews-20190501-200000.gz | |
url = 'https://dumps.wikimedia.org/other/pageviews/{year}/{yearmo}/pageviews-{date}.gz' | |
resp = requests.get(url.format(year=self.run_datetime.year, | |
yearmo=self.run_datetime.isoformat()[:7], | |
date=self.run_datetime.strftime('%Y%m%d-%H0000')), | |
stream=True) | |
pageviews = gzip.GzipFile(fileobj=resp.raw) | |
subdomains = defaultdict(list) | |
Page = namedtuple('Page', 'domain_code page_title count_views total_response_size') | |
error_buffer = '' | |
total_read = 0 | |
print(f'Creating file {self.dest_key}:') | |
with tqdm(total=int(resp.headers['Content-Length']), unit='B', | |
unit_scale=True, unit_divisor=1024) as t: | |
for page in pageviews: | |
prev_read = total_read | |
total_read = resp.raw.tell() | |
t.update(total_read - prev_read) | |
try: | |
p = Page(*page.decode('utf-8').strip().split(' ')) | |
except TypeError as e: | |
# some rows are oddly not scrubbed of newlines, so this accounts for that. | |
# the pages won't be visitable, but they'll be countable | |
error_buffer += '_' + page.decode('utf-8').strip() | |
try: | |
p = Page(*error_buffer.split(' ')) | |
error_buffer = '' | |
except TypeError as e: | |
continue | |
# Eliminate any pages found in this blacklist: | |
if p.page_title in self.blacklist[p.domain_code]: | |
continue | |
# only return top 25. This gives deference to first in, in the case of a last place tie | |
# and will only ever return a max of 25 values. | |
if len(subdomains[p.domain_code]) >= 25: | |
heappushpop(subdomains[p.domain_code], (int(p.count_views), p.page_title)) | |
else: | |
heappush(subdomains[p.domain_code], (int(p.count_views), p.page_title)) | |
# sort the lists, as they're currently heap sorted | |
self.data = {k: sorted(subdomains[k], key=lambda val: val[0], reverse=True) for k in subdomains.keys()} | |
def output_exists(self): | |
s3 = boto3.resource('s3') | |
bucket = s3.Bucket(self.dest_bucket) | |
# list files matching a filter: the path | |
objs = list(bucket.objects.filter(Prefix=self.dest_key)) | |
return len(objs) > 0 | |
def write_to_s3(self): | |
if not self.data: | |
raise RuntimeError('attr `data` does not exist, please run page_view_count()') | |
with smart_open(self.dest_full_path, 'w') as f: | |
for subdomain, records in self.data.items(): | |
for record in records: | |
writer = csv.writer(f) | |
writer.writerow([subdomain, record[1], record[0]]) | |
@staticmethod | |
def wiki_date(value): | |
run_datetime = datetime.datetime.fromisoformat(value) | |
if run_datetime >= datetime.datetime.utcnow().replace(second=0, microsecond=0, minute=0): | |
raise RuntimeError("You can't predict the future, Marty! (date in the future)") | |
if run_datetime < datetime.datetime(2015, 5, 1): | |
raise RuntimeError("Leave the past behind us (date too early)") | |
return run_datetime | |
def run(self): | |
if self.output_exists(): | |
print(f'File {self.dest_key} already exists') | |
return | |
self.get_page_view_count() | |
self.write_to_s3() | |
def __str__(self): | |
return '<WikiPageView(in_datetime={}>'.format(self.run_datetime) | |
def hour_floor(value:datetime.datetime): | |
return value.replace(minute=0, second=0, microsecond=0) | |
if __name__ == '__main__': | |
# Accept input parameters for the date and hour of data to analyze | |
# (default to the current date/hour - 24 hours if not passed, i.e. previous day/hour). | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--date", required=False, type=WikiPageView.wiki_date, | |
default=hour_floor(datetime.datetime.utcnow() - datetime.timedelta(1)), | |
help="Specify the UTC date to run for (default to yesterday)") | |
parser.add_argument("--hour", required=False, type=int, choices=range(24), | |
default=datetime.datetime.utcnow().hour, | |
help="Specify the UTC hour to run for (default to current hour)") | |
# Be capable of being run for a range of dates and hours; each hour within the range should have its own result file | |
parser.add_argument("--end-date", required=False, type=WikiPageView.wiki_date, | |
help="Specify the UTC date to end run for (default to None)") | |
args = parser.parse_args() | |
in_datetime = args.date.replace(hour=args.hour) | |
end_date = args.end_date or in_datetime | |
for run_dt in (in_datetime + datetime.timedelta(hours=n) for n in range(1 + int((end_date - in_datetime).total_seconds() // 3600))): | |
pageview = WikiPageView(run_dt) | |
pageview.run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
boto3 | |
smart_open | |
requests | |
tqdm |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export AWS_ACCESS_KEY_ID=YOUR_KEY_HERE | |
export AWS_SECRET_ACCESS_KEY=YOUR_KEY_HERE | |
export AWS_DEFAULT_REGION=us-east-1 | |
export S3_BUCKET=BUCKET_NAME_HERE | |
export BLACKLIST_DOMAINS_AND_PAGES_URL=url-goes-here |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment