Skip to content

Instantly share code, notes, and snippets.

@drj42
Created March 15, 2016 18:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drj42/9248e3d3d53ce313680c to your computer and use it in GitHub Desktop.
Save drj42/9248e3d3d53ce313680c to your computer and use it in GitHub Desktop.
PySpark Helper Function - perform reduceByKey on a dataframe
# Removes a lot of the boiler plate involved in converting a pyspark dataframe
# to and from an rdd, in order to do a reduceByKey operation.
#
# Lifted from:
# - http://codereview.stackexchange.com/questions/115082/generic-reduceby-or-groupby-aggregate-functionality-with-spark-dataframe
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
def reduce_by(self, by, cols, f, schema=None):
"""
:param self DataFrame
:param by a list of grouping columns
:param cols a list of columns to aggregate
:param aggregation function Row => Row
:return DataFrame
"""
def merge_kv(kv):
key, value = kv
return Row(**OrderedDict(zip(
key.__fields__ + value.__fields__, key + value)
))
return (self
.select(struct(*by), struct(*cols))
.rdd
.reduceByKey(f)
.map(merge_kv)
.toDF(schema))
DataFrame.reduce_by = reduce_by # A quick monkey patch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment