Skip to content

Instantly share code, notes, and snippets.

@marcelrf
Last active May 5, 2024 07:52
Show Gist options
  • Save marcelrf/4e45ef73e1a9d350c0170e1f78e58651 to your computer and use it in GitHub Desktop.
Save marcelrf/4e45ef73e1a9d350c0170e1f78e58651 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Commons Impact Metrics Downloader
# https://wikitech.wikimedia.org/wiki/Commons_Impact_Metrics
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Python Requirements:
# docopt
# pendulum
# bz2
#
# Examples of use:
#
# # Download a given dataset for a specified time range.
# cim_downloader --dataset pageviews_by_category --start 2024-01 --end 2024-04
#
# # Dowload several datasets for a given month.
# cim_downloader --dataset pageviews_by_category --dataset edits --start 2024-01
#
# # Download all datasets filtering by your primary categories.
# cim_downloader --category Some_Category --category Other_Category --start 2024-01
#
# # Download all datasets for a given time range and merge the monthly files into one.
# cim_downloader --merge --start 2024-01 --end 2024-04
"""
Download, uncompress, filter and merge Commons Impact Metrics dumps.
https://wikitech.wikimedia.org/wiki/Commons_Impact_Metrics
Usage:
cim_downloader -h
cim_downloader [-d=<dataset>]... [-c=<category>]... [-m] [-t=<path>] [-o=<path>] [-v] -s=<date> [-e=<date>]
Options:
-h --help Show this help message and exit.
-d --dataset=<dataset> Download only these datasets.
Use one or more of: category_metrics_snapshot,
media_file_metrics_snapshot, pageviews_by_category,
pageviews_by_media_file, or edits.
If not specified, download all datasets.
-c --category=<category> Filter data to include these primary categories.
Use category names (URL format with underscores).
If not specified, includes all categories.
-m --merge Merge all files of a same dataset into one.
-t --temp-folder=<path> Use this temporary folder for downloads.
Removes temporary files once finished.
Default: /tmp/cim_downloader
-o --output-folder=<path> Write the output to this folder.
If not specified, write to the current folder.
-v --verbose Print more detailed logs.
-s --start=<date> Download data starting at this date (YYYY-MM).
Mandatory.
-e --end=<date> Download data up to this date, inclusive (YYYY-MM).
If not specified, download only the start month.
"""
from docopt import docopt
from os import getcwd, mkdir, path, remove
from shutil import rmtree
from urllib.request import urlretrieve
import bz2
import logging
import pendulum
import re
import sys
import uuid
LOG_FORMAT = "%(asctime)s %(levelname)-6s %(message)s"
LOG_DATE_FORMAT = "%Y-%m-%dT%H:%M:%S"
DEFAULT_TEMP_FOLDER = "/tmp/cim_downloader"
DUMPS_BASE_URL = "https://dumps.wikimedia.org/other/commons_impact_metrics"
# Lists the available datasets and specifies which
# column contains the primary categories for filtering.
DATASETS = {
"category_metrics_snapshot": {"primary_column": 2},
"edits": {"primary_column": 4},
"media_file_metrics_snapshot": {"primary_column": 3},
"pageviews_by_media_file": {"primary_column": 2},
"pageviews_by_category": {"primary_column": 2},
}
def check_and_default_args(args):
"""
Checks the correctness of the argument values
and gives them defaults in case they are not specified.
"""
for dataset in args["--dataset"]:
if dataset not in DATASETS:
raise Exception(f"Invalid --dataset argument {dataset}.")
if not args["--dataset"]:
args["--dataset"] = DATASETS.keys()
if not args["--temp-folder"]:
args["--temp-folder"] = DEFAULT_TEMP_FOLDER
if not args["--output-folder"]:
args["--output-folder"] = getcwd()
year_month_re = r"[0-9]{4}-[0-9]{2}"
if not re.fullmatch(year_month_re, args["--start"]):
raise Exception(f"Invalid --start argument {args['--start']}")
if args["--end"]:
if not re.fullmatch(year_month_re, args["--end"]):
raise Exception(f"Invalid --end argument {args['--end']}")
else:
args["--end"] = args["--start"]
if args["--start"] > args["--end"]:
raise Exception("Argument --start is greater than argument --end.")
def get_logger(verbose):
"""
Sets up and returns a logger object.
"""
logger = logging.getLogger()
logger.setLevel(logging.DEBUG if args["--verbose"] else logging.INFO)
formatter = logging.Formatter(fmt=LOG_FORMAT, datefmt=LOG_DATE_FORMAT)
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
logger.addHandler(stdout_handler)
return logger
def get_target_months(start, end):
"""
Returns the timestamps (YYYY-MM) of all the months contained
within the specified time range, end is included.
"""
current_dt = pendulum.parse(start)
end_dt = pendulum.parse(end)
target_dts = [current_dt]
while current_dt < end_dt:
current_dt = current_dt.add(months=1)
target_dts.append(current_dt)
target_months = [dt.format("YYYY-MM") for dt in target_dts]
return target_months
def main(args):
# Prepare execution.
check_and_default_args(args)
logger = get_logger(args["--verbose"])
temp_folder = f"{args['--temp-folder']}/{uuid.uuid4().hex}"
mkdir(temp_folder)
target_months = get_target_months(args["--start"], args["--end"])
# Iterate over datasets and months to download the data.
for dataset in args["--dataset"]:
# If merge is set, use the same output file for all dump files of the same dataset.
if args["--merge"]:
output_file_path = f"{args['--output-folder']}/commons_{dataset}.tsv"
# Use "w" file mode for the first dump file.
output_file_mode = "w"
for month in target_months:
logger.info(f"Downloading {dataset} for {month}...")
# Download the dump file.
dumps_url = f"{DUMPS_BASE_URL}/{dataset}/commons_{dataset}_{month}.tsv.bz2"
dumps_file_name = path.basename(dumps_url)
dumps_file_path = f"{temp_folder}/{dumps_file_name}"
logger.debug(f"Downloading {dumps_url} into {dumps_file_path}")
urlretrieve(dumps_url, dumps_file_path)
# If merge is not set, use a specific output file for this dump file.
if not args["--merge"]:
output_file_path = f"{args['--output-folder']}/{dumps_file_name[0:-4]}"
output_file_mode = "w"
# Extract, filter and merge the downloaded file into the output file.
logger.debug(f"Extracting and filtering {dumps_file_path} into {output_file_path}")
with bz2.open(dumps_file_path, "rt") as extracted_file:
with open(output_file_path, output_file_mode) as output_file:
line = extracted_file.readline()
while line:
# If categories are specified, filter.
if args["--category"]:
# Get the primary category column and values.
primary_column = DATASETS[dataset]["primary_column"]
primary_categories = line.strip().split("\t")[primary_column].split("|")
# Filter.
if any([c in primary_categories for c in args["--category"]]):
output_file.write(line)
else:
output_file.write(line)
line = extracted_file.readline()
# Remove the downloaded file asap to reduce disk usage.
remove(dumps_file_path)
# If merge is set, change the output file mode to "a"
# so that subsequent files of the same dataset are merged to it.
if args["--merge"]:
output_file_mode = "a"
# Cleanup.
rmtree(temp_folder)
if __name__ == "__main__":
args = docopt(__doc__)
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment