Skip to content

Instantly share code, notes, and snippets.

@enkeboll
Last active June 6, 2020 14:43
Show Gist options
  • Save enkeboll/1efcb286576142d98c3ce2762e6fb663 to your computer and use it in GitHub Desktop.
Save enkeboll/1efcb286576142d98c3ce2762e6fb663 to your computer and use it in GitHub Desktop.
Wikipedia Pageview Analyzer
# This application that can compute the top 25 pages on Wikipedia for each of the Wikipedia sub-domains
import argparse
import csv
import datetime
import gzip
import os
import sys
from collections import defaultdict, namedtuple
from heapq import heappush, heappushpop
import boto3
import requests
from smart_open import open as smart_open
from tqdm import tqdm
S3_BUCKET = os.getenv('S3_BUCKET')
class WikiPageView:
dest_bucket = S3_BUCKET
def __init__(self, run_datetime: datetime.datetime):
self.run_datetime = run_datetime
self.dest_key = os.path.join(self.run_datetime.strftime('%Y'),
self.run_datetime.strftime('%m'),
self.run_datetime.strftime('%d'),
f"{self.run_datetime.strftime('%H')}.csv.gz")
self.dest_full_path = os.path.join('s3://',
self.dest_bucket,
self.dest_key)
self._blacklist = None
self.data = None
@property
def blacklist(self):
if not self._blacklist:
self._blacklist = self.get_blacklist()
return self._blacklist
@staticmethod
def get_blacklist():
filename = 'blacklist_domains_and_pages'
try:
with open(filename, 'r') as f:
text = f.read().split('\n')
except FileNotFoundError:
print('blacklist_domains_and_pages does not exist locally, downloading')
url = os.getenv('BLACKLIST_DOMAINS_AND_PAGES_URL')
text = requests.get(url).text
with open(filename, 'w') as f:
f.write(text)
text = text.split('\n')
blacklist = defaultdict(set)
for item in text:
if item:
k, v = item.split(' ')
blacklist[k].add(v)
return blacklist
def get_page_view_count(self):
"""Download the page view counts from wikipedia for the given
date/hour from https://dumps.wikimedia.org/other/pageviews/
"""
# example: https://dumps.wikimedia.org/other/pageviews/2019/2019-05/pageviews-20190501-200000.gz
url = 'https://dumps.wikimedia.org/other/pageviews/{year}/{yearmo}/pageviews-{date}.gz'
resp = requests.get(url.format(year=self.run_datetime.year,
yearmo=self.run_datetime.isoformat()[:7],
date=self.run_datetime.strftime('%Y%m%d-%H0000')),
stream=True)
pageviews = gzip.GzipFile(fileobj=resp.raw)
subdomains = defaultdict(list)
Page = namedtuple('Page', 'domain_code page_title count_views total_response_size')
error_buffer = ''
total_read = 0
print(f'Creating file {self.dest_key}:')
with tqdm(total=int(resp.headers['Content-Length']), unit='B',
unit_scale=True, unit_divisor=1024) as t:
for page in pageviews:
prev_read = total_read
total_read = resp.raw.tell()
t.update(total_read - prev_read)
try:
p = Page(*page.decode('utf-8').strip().split(' '))
except TypeError as e:
# some rows are oddly not scrubbed of newlines, so this accounts for that.
# the pages won't be visitable, but they'll be countable
error_buffer += '_' + page.decode('utf-8').strip()
try:
p = Page(*error_buffer.split(' '))
error_buffer = ''
except TypeError as e:
continue
# Eliminate any pages found in this blacklist:
if p.page_title in self.blacklist[p.domain_code]:
continue
# only return top 25. This gives deference to first in, in the case of a last place tie
# and will only ever return a max of 25 values.
if len(subdomains[p.domain_code]) >= 25:
heappushpop(subdomains[p.domain_code], (int(p.count_views), p.page_title))
else:
heappush(subdomains[p.domain_code], (int(p.count_views), p.page_title))
# sort the lists, as they're currently heap sorted
self.data = {k: sorted(subdomains[k], key=lambda val: val[0], reverse=True) for k in subdomains.keys()}
def output_exists(self):
s3 = boto3.resource('s3')
bucket = s3.Bucket(self.dest_bucket)
# list files matching a filter: the path
objs = list(bucket.objects.filter(Prefix=self.dest_key))
return len(objs) > 0
def write_to_s3(self):
if not self.data:
raise RuntimeError('attr `data` does not exist, please run page_view_count()')
with smart_open(self.dest_full_path, 'w') as f:
for subdomain, records in self.data.items():
for record in records:
writer = csv.writer(f)
writer.writerow([subdomain, record[1], record[0]])
@staticmethod
def wiki_date(value):
run_datetime = datetime.datetime.fromisoformat(value)
if run_datetime >= datetime.datetime.utcnow().replace(second=0, microsecond=0, minute=0):
raise RuntimeError("You can't predict the future, Marty! (date in the future)")
if run_datetime < datetime.datetime(2015, 5, 1):
raise RuntimeError("Leave the past behind us (date too early)")
return run_datetime
def run(self):
if self.output_exists():
print(f'File {self.dest_key} already exists')
return
self.get_page_view_count()
self.write_to_s3()
def __str__(self):
return '<WikiPageView(in_datetime={}>'.format(self.run_datetime)
def hour_floor(value:datetime.datetime):
return value.replace(minute=0, second=0, microsecond=0)
if __name__ == '__main__':
# Accept input parameters for the date and hour of data to analyze
# (default to the current date/hour - 24 hours if not passed, i.e. previous day/hour).
parser = argparse.ArgumentParser()
parser.add_argument("--date", required=False, type=WikiPageView.wiki_date,
default=hour_floor(datetime.datetime.utcnow() - datetime.timedelta(1)),
help="Specify the UTC date to run for (default to yesterday)")
parser.add_argument("--hour", required=False, type=int, choices=range(24),
default=datetime.datetime.utcnow().hour,
help="Specify the UTC hour to run for (default to current hour)")
# Be capable of being run for a range of dates and hours; each hour within the range should have its own result file
parser.add_argument("--end-date", required=False, type=WikiPageView.wiki_date,
help="Specify the UTC date to end run for (default to None)")
args = parser.parse_args()
in_datetime = args.date.replace(hour=args.hour)
end_date = args.end_date or in_datetime
for run_dt in (in_datetime + datetime.timedelta(hours=n) for n in range(1 + int((end_date - in_datetime).total_seconds() // 3600))):
pageview = WikiPageView(run_dt)
pageview.run()
boto3
smart_open
requests
tqdm
export AWS_ACCESS_KEY_ID=YOUR_KEY_HERE
export AWS_SECRET_ACCESS_KEY=YOUR_KEY_HERE
export AWS_DEFAULT_REGION=us-east-1
export S3_BUCKET=BUCKET_NAME_HERE
export BLACKLIST_DOMAINS_AND_PAGES_URL=url-goes-here
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment