Skip to content

Instantly share code, notes, and snippets.

@chumo
Created June 17, 2016 13:14
Show Gist options
  • Save chumo/aa649aa16a0efe9f5dd0d650fb07c0c7 to your computer and use it in GitHub Desktop.
Save chumo/aa649aa16a0efe9f5dd0d650fb07c0c7 to your computer and use it in GitHub Desktop.
Parallelize apply after pandas groupby using PySpark
import pandas as pd
# Spark context
import pyspark
sc = pyspark.SparkContext()
# apply parallel
def applyParallel(dfGrouped, func):
# rdd with the group of dataframes
groups = [group for name, group in dfGrouped]
names = [name for name, group in dfGrouped]
dummy_rdd = sc.parallelize(groups)
# assuming that func(pandas dataframe) returns a series, the following is a list of pandas series
ret_list = dummy_rdd.map(func).collect()
# concatenate them in a pandas dataframe and return
result = pd.concat([S.to_frame().transpose() for S in ret_list])
result.index = names
return result
# Example:
##########
def f(g):
return pd.Series({'nrows':g.shape[0],'ncols':g.shape[1]})
pepe = pd.DataFrame({'a':['q1','q1','q2','q3','q4','q4','q4','q3'],'b':[3,5,3,6,2,4,3,5]})
juan = applyParallel(pepe.groupby('a'), f)
@eric-valente
Copy link

Tried running this - getting this error:

Thoughts?

TypeError: 'GroupedData' object is not iterable

@geosmart
Copy link

geosmart commented Apr 2, 2018

with Spark2.2.0
Tried running this - getting this error:

File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2455, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2388, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2374, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\serializers.py", line 460, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 704, in dumps
cp.dump(obj)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 162, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment