-
-
Save chumo/aa649aa16a0efe9f5dd0d650fb07c0c7 to your computer and use it in GitHub Desktop.
import pandas as pd | |
# Spark context | |
import pyspark | |
sc = pyspark.SparkContext() | |
# apply parallel | |
def applyParallel(dfGrouped, func): | |
# rdd with the group of dataframes | |
groups = [group for name, group in dfGrouped] | |
names = [name for name, group in dfGrouped] | |
dummy_rdd = sc.parallelize(groups) | |
# assuming that func(pandas dataframe) returns a series, the following is a list of pandas series | |
ret_list = dummy_rdd.map(func).collect() | |
# concatenate them in a pandas dataframe and return | |
result = pd.concat([S.to_frame().transpose() for S in ret_list]) | |
result.index = names | |
return result | |
# Example: | |
########## | |
def f(g): | |
return pd.Series({'nrows':g.shape[0],'ncols':g.shape[1]}) | |
pepe = pd.DataFrame({'a':['q1','q1','q2','q3','q4','q4','q4','q3'],'b':[3,5,3,6,2,4,3,5]}) | |
juan = applyParallel(pepe.groupby('a'), f) |
with Spark2.2.0
Tried running this - getting this error:
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2455, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2388, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2374, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\serializers.py", line 460, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 704, in dumps
cp.dump(obj)
File "C:\ProgramData\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 162, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Tried running this - getting this error:
Thoughts?
TypeError: 'GroupedData' object is not iterable