Created
January 21, 2016 11:44
-
-
Save youtux/13353862ac4227c4baf2 to your computer and use it in GitHub Desktop.
wiki-pagecounts-sort
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# noqa | |
from __future__ import print_function | |
import sys | |
import os | |
import datetime | |
import argparse | |
from urllib.parse import unquote, quote | |
from operator import itemgetter | |
import itertools | |
import bisect | |
import functools | |
from pyspark import SparkContext | |
from pyspark.conf import SparkConf | |
from pyspark.storagelevel import StorageLevel | |
BASENAME_TIMESTAMP_PATTERN = 'pagecounts-%Y%m%d-%H%M%S.gz' | |
lower = str.lower | |
split = bytes.split | |
space = b' ' | |
decode = bytes.decode | |
def line_parser(line_raw): | |
project, page, counts, bytes_trans = split(line_raw, space) | |
project = decode(project, 'utf-8', 'replace') | |
page = decode(page, 'utf-8', 'replace') | |
return ( | |
lower(project), | |
unquote(page), | |
int(counts), | |
int(bytes_trans) | |
) | |
def input_mapper(filename): | |
basename = os.path.basename(filename) | |
timestamp = datetime.datetime.strptime( | |
basename, | |
BASENAME_TIMESTAMP_PATTERN, | |
) | |
# Ignore everything after the "hour" | |
timestamp = datetime.datetime( | |
timestamp.year, | |
timestamp.month, | |
timestamp.day, | |
timestamp.hour | |
) | |
def line_mapper(line): | |
project, page, count, bytes_trans = line_parser(line) | |
return project, page, timestamp, count, bytes_trans | |
return line_mapper | |
def input_line_filter_provider(project_list): | |
if project_list is None: | |
return lambda x: True | |
def line_filter(line): | |
project, page, timestamp, count, bytes_trans = line | |
return project in project_list | |
return line_filter | |
def parse_arguments(): | |
parser = argparse.ArgumentParser( | |
description='Sort wikimedia pagecounts by (project, article)', | |
) | |
parser.add_argument( | |
'input_files', | |
metavar='INPUT_FILE', | |
nargs='+', | |
help='''\ | |
Input file in gzip format. Basename must match the pattern: | |
{pattern}'''.format(pattern=BASENAME_TIMESTAMP_PATTERN), | |
) | |
parser.add_argument( | |
'output_file', | |
metavar='OUTPUT_FILE', | |
help="Output file (it will be gzip'd)", | |
) | |
parser.add_argument( | |
'--aggregate', | |
required=False, | |
choices={'hourly', '2hours', '6hours', 'daily', 'monthly'}, | |
help='Aggregate the stats', | |
) | |
parser.add_argument( | |
'--input-partitions', | |
type=int, | |
required=False, | |
help='''\ | |
Number of input partitions [default: SparkContext.defaultMinPartitions]''', | |
) | |
parser.add_argument( | |
'--output-partitions', | |
type=int, | |
required=False, | |
help='Number of output partitions [default: input-partitions]', | |
) | |
parser.add_argument( | |
'--projects', '-p', | |
type=str, | |
default=None, | |
required=False, | |
help='Comma-separated list of projects to keep. Leave empty for all.', | |
) | |
args = parser.parse_args() | |
return args | |
def get_key_for_aggregator(aggregator): # Callable[[Any], [str, str, datetime.datetime]] | |
if aggregator is None: | |
raise NotImplementedError | |
if aggregator == 'hourly': | |
def ts_get(timestamp): | |
return datetime.datetime( | |
timestamp.year, | |
timestamp.month, | |
timestamp.day, | |
timestamp.hour, | |
) | |
elif aggregator == '2hours': | |
def ts_get(timestamp): | |
base_hour = (timestamp.hour // 2) * 2 | |
return datetime.datetime( | |
timestamp.year, | |
timestamp.month, | |
timestamp.day, | |
base_hour, | |
) | |
elif aggregator == '6hours': | |
def ts_get(timestamp): | |
base_hour = (timestamp.hour // 6) * 6 | |
return datetime.datetime( | |
timestamp.year, | |
timestamp.month, | |
timestamp.day, | |
base_hour, | |
) | |
elif aggregator == 'daily': | |
def ts_get(timestamp): | |
return datetime.datetime( | |
timestamp.year, | |
timestamp.month, | |
timestamp.day, | |
) | |
elif aggregator == 'monthly': | |
def ts_get(timestamp): | |
return datetime.datetime( | |
timestamp.year, | |
timestamp.month, | |
1, | |
) | |
def key_fn(record): | |
project, name, timestamp, count, bytes_trans = record | |
return project, name, ts_get(timestamp) | |
return key_fn | |
def key_with_ts(record): | |
project, page, timestamp, count, bytes_trans = record | |
return (project, page, timestamp) | |
def key_without_ts(record): | |
project, page, timestamp, count, bytes_trans = record | |
return (project, page) | |
def sum_counts_and_bytes_for_record(a_record, b_record): | |
return ( | |
a_record[0], | |
a_record[1], | |
a_record[2], | |
(a_record[3] + b_record[3]), | |
(a_record[4] + b_record[4]), | |
) | |
def sum_counts_and_bytes(a_values, b_values): | |
return ( | |
(a_values[0] + b_values[0]), | |
(a_values[1] + b_values[1]), | |
) | |
# def groupByKeyLocally(self, numPartitions=None): | |
# """ | |
# Group the values for each key in the RDD into a single sequence. | |
# Hash-partitions the resulting RDD with numPartitions partitions. | |
# Note: If you are grouping in order to perform an aggregation (such as a | |
# sum or average) over each key, using reduceByKey or aggregateByKey will | |
# provide much better performance. | |
# >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) | |
# >>> sorted(rdd.groupByKey().mapValues(len).collect()) | |
# [('a', 2), ('b', 1)] | |
# >>> sorted(rdd.groupByKey().mapValues(list).collect()) | |
# [('a', [1, 1]), ('b', [1])] | |
# """ | |
# from pyspark.shuffle import * | |
# | |
# def createCombiner(x): | |
# return [x] | |
# | |
# def mergeValue(xs, x): | |
# xs.append(x) | |
# return xs | |
# | |
# def mergeCombiners(a, b): | |
# a.extend(b) | |
# return a | |
# | |
# spill = self._can_spill() | |
# memory = self._memory_limit() | |
# serializer = self._jrdd_deserializer | |
# agg = Aggregator(createCombiner, mergeValue, mergeCombiners) | |
# | |
# def combine(iterator): | |
# merger = ExternalMerger(agg, memory * 0.9, serializer) \ | |
# if spill else InMemoryMerger(agg) | |
# merger.mergeValues(iterator) | |
# return merger.items() | |
# | |
# locally_combined = self.mapPartitions(combine, preservesPartitioning=True) | |
# # return locally_combined | |
# | |
# def groupByKey(it): | |
# merger = ExternalGroupBy(agg, memory, serializer)\ | |
# if spill else InMemoryMerger(agg) | |
# merger.mergeCombiners(it) | |
# return merger.items() | |
# | |
# return locally_combined.mapPartitions(groupByKey, True).mapValues(ResultIterable) | |
def get_partitioner(sample_rdd_kv, numPartitions, keyfunc=lambda x: x): | |
# print('get_partitioner: sample_rdd_kv:', sample_rdd_kv.take(1)) | |
rddSize = sample_rdd_kv.count() | |
print('sortByKey: rddSize: ', rddSize) | |
maxSampleSize = numPartitions * 200.0 # constant from Spark's RangePartitioner | |
print('sortByKey: maxSampleSize', maxSampleSize) | |
fraction = min(maxSampleSize / max(rddSize, 1), 1.0) | |
print('sortByKey: fraction', fraction) | |
samples = sample_rdd_kv.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() | |
samples = sorted(samples, key=keyfunc) | |
print('sortByKey: samples[0:10]', samples[0:10]) | |
bounds = [samples[int(len(samples) * (i + 1) / numPartitions)] | |
for i in range(0, numPartitions - 1)] | |
print('sortByKey: bounds', bounds) | |
def rangePartitioner(k): | |
p = bisect.bisect_left(bounds, keyfunc(k)) | |
return p | |
return rangePartitioner | |
def main(): | |
args = parse_arguments() | |
print(args) | |
if os.path.exists(args.output_file): | |
print('Output file already exists:', args.output_file) | |
sys.exit(1) | |
conf = SparkConf() | |
conf.set("io.compression.codecs", "io.sensesecure.hadoop.xz.XZCodec") | |
sc = SparkContext(conf=conf) | |
# WARNING! pyspark may be bugged: during the partitionBy, if a partition | |
# exceed the memory it will crash! | |
# Better play safe and use a huge number of partitions | |
output_partitions = args.output_partitions or (len(args.input_files) * 100) | |
print('output_partitions:', output_partitions) | |
rdds_list = [sc.textFile( | |
name=input_file, | |
minPartitions=1, # 1 partition per file | |
use_unicode=False, | |
).map(input_mapper(input_file)) for input_file in args.input_files] | |
# Remove lines that we don't want | |
if args.projects: | |
projects_list = args.projects.split(u',') | |
line_filter = input_line_filter_provider(projects_list) | |
rdds_list = [rdd.filter(line_filter) for rdd in rdds_list] | |
# Aggregate locally the lines that represent the same | |
# (project, page, timestamp) | |
def partitionMapper(kv_records): | |
for k, group in itertools.groupby(kv_records, key=itemgetter(0)): | |
project, page, timestamp = k | |
values = [group_value for group_key, group_value in group] | |
values_aggr = list(functools.reduce(sum_counts_and_bytes, values)) | |
yield (project, page, timestamp, values_aggr[0], values_aggr[1]) | |
rdds_list = [ | |
rdd | |
.map(lambda record: (record[:3], record[3:])) # key by p,p,ts | |
.sortByKey(numPartitions=1) | |
.mapPartitions(partitionMapper) | |
for rdd in rdds_list | |
] | |
sample_rdd = rdds_list[0] | |
sample_rdd.persist(StorageLevel.MEMORY_AND_DISK) | |
range_partitioner = get_partitioner( | |
sample_rdd.map(lambda record: (record[:2], )), # key by proj, page | |
output_partitions, | |
) | |
# Create a single rdd | |
union_rdd = sc.union(rdds_list) | |
# Aggregate if requested | |
# if args.aggregate: | |
# key_fn = get_key_for_aggregator(args.aggregate) | |
# union_rdd_kv = ( | |
# union_rdd | |
# .map(lambda record: (key_fn(record), record[3:])) | |
# .reduceByKey(sum_counts_and_bytes) | |
# ) | |
# else: | |
# union_rdd_kv = ( | |
# union_rdd | |
# .map(lambda record: (key_with_ts(record), record[3:])) | |
# ) | |
# | |
# def record_ppt_to_pp(record): | |
# (project, page, timestamp_aggr), (count, bytes_trans) = record | |
# return (project, page), (timestamp_aggr, count, bytes_trans) | |
# | |
# # Sort the rdd by key, which is (project, page) | |
# union_rdd_kv = ( | |
# union_rdd_kv | |
# .map(record_ppt_to_pp) | |
# ) | |
def record_to_kv_without_timestamp(record): | |
project, page, timestamp, count, bytes_trans = record | |
return (project, page), (timestamp, count, bytes_trans) | |
# union_rdd_kv = ( | |
# union_rdd | |
# .map(record_to_kv_without_timestamp) | |
# .sortByKey(numPartitions=output_partitions) | |
# ) | |
# | |
# print('------ count ------') | |
# print('count:', union_rdd_kv.count()) | |
# return | |
union_rdd_kv = ( | |
union_rdd | |
.map(record_to_kv_without_timestamp) | |
.repartitionAndSortWithinPartitions( | |
numPartitions=output_partitions, | |
partitionFunc=range_partitioner, | |
ascending=True, | |
) | |
# .partitionBy( | |
# numPartitions=output_partitions, | |
# partitionFunc=range_partitioner, | |
# ) | |
) | |
# print('------ count ------') | |
# def partCounter(index, partition): | |
# c = 0 | |
# for element in partition: | |
# c += 1 | |
# yield (index, c) | |
# | |
# print('partitioCount:', union_rdd_kv.mapPartitionsWithIndex(partCounter).collect()) | |
# print('count:', union_rdd_kv.count()) | |
# return | |
# TIMESTAMP_PATTERN = '%Y%m%d-%H%M%S' | |
# def kv_tuple_to_kv(record): | |
# (project, page), (timestamp, counts, bytes_trans) = record | |
# return project, page, timestamp.strftime(TIMESTAMP_PATTERN), counts, bytes_trans | |
# # TODO: Check if not using strftime speeds up the saving | |
# union_rdd_kv.map(kv_tuple_to_kv).saveAsSequenceFile( | |
# args.output_file, | |
# compressionCodecClass='org.apache.hadoop.io.compress.GzipCodec', | |
# ) | |
# print('---------------- saving as csv text file ------------') | |
def kv_tuple_to_text(kv_tuple): | |
(project, page), (timestamp, counts, bytes_trans) = kv_tuple | |
return u'{} {} {} {} {}'.format( | |
project, | |
quote(page), | |
timestamp.strftime('%Y%m%d-%H%M%S'), | |
counts, | |
bytes_trans, | |
) | |
union_rdd_kv.map(kv_tuple_to_text).saveAsTextFile( | |
args.output_file + '.csv', | |
compressionCodecClass='org.apache.hadoop.io.compress.GzipCodec', | |
) | |
# # Group by key | |
# def grouper(kv_records): | |
# for k, group in itertools.groupby(kv_records, key=itemgetter(0)): | |
# yield k, sorted(v for k, v in group) | |
# union_rdd_kv_grouped = union_rdd_kv.mapPartitions(grouper) | |
# | |
# print('---------------- saving as pickle file ------------') | |
# union_rdd_kv_grouped.saveAsPickleFile(args.output_file + '.pickle') | |
# | |
# print('---------------- saving as text file ------------') | |
# union_rdd_kv_grouped.map(str).saveAsTextFile( | |
# args.output_file + '.txt', | |
# compressionCodecClass='org.apache.hadoop.io.compress.GzipCodec', | |
# ) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This file sort raw wikimedia pagecounts (up to 2015) by (project, page, timestamp)