Skip to content

Instantly share code, notes, and snippets.

@youtux
Created January 21, 2016 11:44
Show Gist options
  • Save youtux/13353862ac4227c4baf2 to your computer and use it in GitHub Desktop.
Save youtux/13353862ac4227c4baf2 to your computer and use it in GitHub Desktop.
wiki-pagecounts-sort
# noqa
from __future__ import print_function
import sys
import os
import datetime
import argparse
from urllib.parse import unquote, quote
from operator import itemgetter
import itertools
import bisect
import functools
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.storagelevel import StorageLevel
BASENAME_TIMESTAMP_PATTERN = 'pagecounts-%Y%m%d-%H%M%S.gz'
lower = str.lower
split = bytes.split
space = b' '
decode = bytes.decode
def line_parser(line_raw):
project, page, counts, bytes_trans = split(line_raw, space)
project = decode(project, 'utf-8', 'replace')
page = decode(page, 'utf-8', 'replace')
return (
lower(project),
unquote(page),
int(counts),
int(bytes_trans)
)
def input_mapper(filename):
basename = os.path.basename(filename)
timestamp = datetime.datetime.strptime(
basename,
BASENAME_TIMESTAMP_PATTERN,
)
# Ignore everything after the "hour"
timestamp = datetime.datetime(
timestamp.year,
timestamp.month,
timestamp.day,
timestamp.hour
)
def line_mapper(line):
project, page, count, bytes_trans = line_parser(line)
return project, page, timestamp, count, bytes_trans
return line_mapper
def input_line_filter_provider(project_list):
if project_list is None:
return lambda x: True
def line_filter(line):
project, page, timestamp, count, bytes_trans = line
return project in project_list
return line_filter
def parse_arguments():
parser = argparse.ArgumentParser(
description='Sort wikimedia pagecounts by (project, article)',
)
parser.add_argument(
'input_files',
metavar='INPUT_FILE',
nargs='+',
help='''\
Input file in gzip format. Basename must match the pattern:
{pattern}'''.format(pattern=BASENAME_TIMESTAMP_PATTERN),
)
parser.add_argument(
'output_file',
metavar='OUTPUT_FILE',
help="Output file (it will be gzip'd)",
)
parser.add_argument(
'--aggregate',
required=False,
choices={'hourly', '2hours', '6hours', 'daily', 'monthly'},
help='Aggregate the stats',
)
parser.add_argument(
'--input-partitions',
type=int,
required=False,
help='''\
Number of input partitions [default: SparkContext.defaultMinPartitions]''',
)
parser.add_argument(
'--output-partitions',
type=int,
required=False,
help='Number of output partitions [default: input-partitions]',
)
parser.add_argument(
'--projects', '-p',
type=str,
default=None,
required=False,
help='Comma-separated list of projects to keep. Leave empty for all.',
)
args = parser.parse_args()
return args
def get_key_for_aggregator(aggregator): # Callable[[Any], [str, str, datetime.datetime]]
if aggregator is None:
raise NotImplementedError
if aggregator == 'hourly':
def ts_get(timestamp):
return datetime.datetime(
timestamp.year,
timestamp.month,
timestamp.day,
timestamp.hour,
)
elif aggregator == '2hours':
def ts_get(timestamp):
base_hour = (timestamp.hour // 2) * 2
return datetime.datetime(
timestamp.year,
timestamp.month,
timestamp.day,
base_hour,
)
elif aggregator == '6hours':
def ts_get(timestamp):
base_hour = (timestamp.hour // 6) * 6
return datetime.datetime(
timestamp.year,
timestamp.month,
timestamp.day,
base_hour,
)
elif aggregator == 'daily':
def ts_get(timestamp):
return datetime.datetime(
timestamp.year,
timestamp.month,
timestamp.day,
)
elif aggregator == 'monthly':
def ts_get(timestamp):
return datetime.datetime(
timestamp.year,
timestamp.month,
1,
)
def key_fn(record):
project, name, timestamp, count, bytes_trans = record
return project, name, ts_get(timestamp)
return key_fn
def key_with_ts(record):
project, page, timestamp, count, bytes_trans = record
return (project, page, timestamp)
def key_without_ts(record):
project, page, timestamp, count, bytes_trans = record
return (project, page)
def sum_counts_and_bytes_for_record(a_record, b_record):
return (
a_record[0],
a_record[1],
a_record[2],
(a_record[3] + b_record[3]),
(a_record[4] + b_record[4]),
)
def sum_counts_and_bytes(a_values, b_values):
return (
(a_values[0] + b_values[0]),
(a_values[1] + b_values[1]),
)
# def groupByKeyLocally(self, numPartitions=None):
# """
# Group the values for each key in the RDD into a single sequence.
# Hash-partitions the resulting RDD with numPartitions partitions.
# Note: If you are grouping in order to perform an aggregation (such as a
# sum or average) over each key, using reduceByKey or aggregateByKey will
# provide much better performance.
# >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
# >>> sorted(rdd.groupByKey().mapValues(len).collect())
# [('a', 2), ('b', 1)]
# >>> sorted(rdd.groupByKey().mapValues(list).collect())
# [('a', [1, 1]), ('b', [1])]
# """
# from pyspark.shuffle import *
#
# def createCombiner(x):
# return [x]
#
# def mergeValue(xs, x):
# xs.append(x)
# return xs
#
# def mergeCombiners(a, b):
# a.extend(b)
# return a
#
# spill = self._can_spill()
# memory = self._memory_limit()
# serializer = self._jrdd_deserializer
# agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
#
# def combine(iterator):
# merger = ExternalMerger(agg, memory * 0.9, serializer) \
# if spill else InMemoryMerger(agg)
# merger.mergeValues(iterator)
# return merger.items()
#
# locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
# # return locally_combined
#
# def groupByKey(it):
# merger = ExternalGroupBy(agg, memory, serializer)\
# if spill else InMemoryMerger(agg)
# merger.mergeCombiners(it)
# return merger.items()
#
# return locally_combined.mapPartitions(groupByKey, True).mapValues(ResultIterable)
def get_partitioner(sample_rdd_kv, numPartitions, keyfunc=lambda x: x):
# print('get_partitioner: sample_rdd_kv:', sample_rdd_kv.take(1))
rddSize = sample_rdd_kv.count()
print('sortByKey: rddSize: ', rddSize)
maxSampleSize = numPartitions * 200.0 # constant from Spark's RangePartitioner
print('sortByKey: maxSampleSize', maxSampleSize)
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
print('sortByKey: fraction', fraction)
samples = sample_rdd_kv.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
samples = sorted(samples, key=keyfunc)
print('sortByKey: samples[0:10]', samples[0:10])
bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
for i in range(0, numPartitions - 1)]
print('sortByKey: bounds', bounds)
def rangePartitioner(k):
p = bisect.bisect_left(bounds, keyfunc(k))
return p
return rangePartitioner
def main():
args = parse_arguments()
print(args)
if os.path.exists(args.output_file):
print('Output file already exists:', args.output_file)
sys.exit(1)
conf = SparkConf()
conf.set("io.compression.codecs", "io.sensesecure.hadoop.xz.XZCodec")
sc = SparkContext(conf=conf)
# WARNING! pyspark may be bugged: during the partitionBy, if a partition
# exceed the memory it will crash!
# Better play safe and use a huge number of partitions
output_partitions = args.output_partitions or (len(args.input_files) * 100)
print('output_partitions:', output_partitions)
rdds_list = [sc.textFile(
name=input_file,
minPartitions=1, # 1 partition per file
use_unicode=False,
).map(input_mapper(input_file)) for input_file in args.input_files]
# Remove lines that we don't want
if args.projects:
projects_list = args.projects.split(u',')
line_filter = input_line_filter_provider(projects_list)
rdds_list = [rdd.filter(line_filter) for rdd in rdds_list]
# Aggregate locally the lines that represent the same
# (project, page, timestamp)
def partitionMapper(kv_records):
for k, group in itertools.groupby(kv_records, key=itemgetter(0)):
project, page, timestamp = k
values = [group_value for group_key, group_value in group]
values_aggr = list(functools.reduce(sum_counts_and_bytes, values))
yield (project, page, timestamp, values_aggr[0], values_aggr[1])
rdds_list = [
rdd
.map(lambda record: (record[:3], record[3:])) # key by p,p,ts
.sortByKey(numPartitions=1)
.mapPartitions(partitionMapper)
for rdd in rdds_list
]
sample_rdd = rdds_list[0]
sample_rdd.persist(StorageLevel.MEMORY_AND_DISK)
range_partitioner = get_partitioner(
sample_rdd.map(lambda record: (record[:2], )), # key by proj, page
output_partitions,
)
# Create a single rdd
union_rdd = sc.union(rdds_list)
# Aggregate if requested
# if args.aggregate:
# key_fn = get_key_for_aggregator(args.aggregate)
# union_rdd_kv = (
# union_rdd
# .map(lambda record: (key_fn(record), record[3:]))
# .reduceByKey(sum_counts_and_bytes)
# )
# else:
# union_rdd_kv = (
# union_rdd
# .map(lambda record: (key_with_ts(record), record[3:]))
# )
#
# def record_ppt_to_pp(record):
# (project, page, timestamp_aggr), (count, bytes_trans) = record
# return (project, page), (timestamp_aggr, count, bytes_trans)
#
# # Sort the rdd by key, which is (project, page)
# union_rdd_kv = (
# union_rdd_kv
# .map(record_ppt_to_pp)
# )
def record_to_kv_without_timestamp(record):
project, page, timestamp, count, bytes_trans = record
return (project, page), (timestamp, count, bytes_trans)
# union_rdd_kv = (
# union_rdd
# .map(record_to_kv_without_timestamp)
# .sortByKey(numPartitions=output_partitions)
# )
#
# print('------ count ------')
# print('count:', union_rdd_kv.count())
# return
union_rdd_kv = (
union_rdd
.map(record_to_kv_without_timestamp)
.repartitionAndSortWithinPartitions(
numPartitions=output_partitions,
partitionFunc=range_partitioner,
ascending=True,
)
# .partitionBy(
# numPartitions=output_partitions,
# partitionFunc=range_partitioner,
# )
)
# print('------ count ------')
# def partCounter(index, partition):
# c = 0
# for element in partition:
# c += 1
# yield (index, c)
#
# print('partitioCount:', union_rdd_kv.mapPartitionsWithIndex(partCounter).collect())
# print('count:', union_rdd_kv.count())
# return
# TIMESTAMP_PATTERN = '%Y%m%d-%H%M%S'
# def kv_tuple_to_kv(record):
# (project, page), (timestamp, counts, bytes_trans) = record
# return project, page, timestamp.strftime(TIMESTAMP_PATTERN), counts, bytes_trans
# # TODO: Check if not using strftime speeds up the saving
# union_rdd_kv.map(kv_tuple_to_kv).saveAsSequenceFile(
# args.output_file,
# compressionCodecClass='org.apache.hadoop.io.compress.GzipCodec',
# )
# print('---------------- saving as csv text file ------------')
def kv_tuple_to_text(kv_tuple):
(project, page), (timestamp, counts, bytes_trans) = kv_tuple
return u'{} {} {} {} {}'.format(
project,
quote(page),
timestamp.strftime('%Y%m%d-%H%M%S'),
counts,
bytes_trans,
)
union_rdd_kv.map(kv_tuple_to_text).saveAsTextFile(
args.output_file + '.csv',
compressionCodecClass='org.apache.hadoop.io.compress.GzipCodec',
)
# # Group by key
# def grouper(kv_records):
# for k, group in itertools.groupby(kv_records, key=itemgetter(0)):
# yield k, sorted(v for k, v in group)
# union_rdd_kv_grouped = union_rdd_kv.mapPartitions(grouper)
#
# print('---------------- saving as pickle file ------------')
# union_rdd_kv_grouped.saveAsPickleFile(args.output_file + '.pickle')
#
# print('---------------- saving as text file ------------')
# union_rdd_kv_grouped.map(str).saveAsTextFile(
# args.output_file + '.txt',
# compressionCodecClass='org.apache.hadoop.io.compress.GzipCodec',
# )
if __name__ == '__main__':
main()
@youtux
Copy link
Author

youtux commented Jan 21, 2016

This file sort raw wikimedia pagecounts (up to 2015) by (project, page, timestamp)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment