Skip to content

Instantly share code, notes, and snippets.

@joshlk
Last active May 15, 2023 13:48
Show Gist options
  • Star 86 You must be signed in to star a gist
  • Fork 16 You must be signed in to fork a gist
  • Save joshlk/871d58e01417478176e7 to your computer and use it in GitHub Desktop.
Save joshlk/871d58e01417478176e7 to your computer and use it in GitHub Desktop.
PySpark faster toPandas using mapPartitions
import pandas as pd
def _map_to_pandas(rdds):
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]
def toPandas(df, n_partitions=None):
"""
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
repartitioned if `n_partitions` is passed.
:param df: pyspark.sql.DataFrame
:param n_partitions: int or None
:return: pandas.DataFrame
"""
if n_partitions is not None: df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
Copy link

ghost commented Apr 9, 2016

Not only faster but fixed a job-crashing out-of-memory issue for me! Much appreciated!

@gopalatumuluri
Copy link

gopalatumuluri commented Oct 13, 2016

I am running into the memory problem. This works on about 500,000 rows, but runs out of memory with anything larger. I am partitioning the spark data frame by two columns, and then converting 'toPandas(df)' using above. Any ideas on best way to use this? I want each individual partition to be a pandas data frame.

@joshlk
Copy link
Author

joshlk commented Oct 25, 2016

Its likely that your partitions are too big for each executor. You probably need to increase the number of partitions before using toPandas.

@cs4026
Copy link

cs4026 commented Jun 15, 2017

Hey Josh,

I was looking to use the code to create a pandas data frame from a pyspark data frame of 10mil+ records. I saved the above code to a file (faster_toPandas.py) and attempted to import this into my main program. While attempting to call the toPandas() function on my Pyspark dataframe, I kept receiving an Import Error: Module "faster_toPandas" not found. It appears that pickle.loads() was the last call made before an error was thrown. Do you have any ideas on what may be the cause of this error?

Any help would be greatly appreciated!

@NegatioN
Copy link

Why isn't this the default implementation? Are there any drawbacks here that I'm not seeing?
Great gist btw, really appreciate it.

@kiranreddyrebel
Copy link

thanx , it's a great way of using toPandas() func.

@dgouju
Copy link

dgouju commented Apr 6, 2018

I was using this great code with pandas 0.18.1, but now with 0.21.1 it fails this way:

<ipython-input-5-63082a0c1bfc> in toPandas(df, n_partitions)
     30     """
     31     if n_partitions is not None: df = df.repartition(n_partitions)
---> 32     df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
     33     df_pand = pd.concat(df_pand)
     34     df_pand.columns = df.columns

/local/adhoc/spark/python/pyspark/rdd.py in collect(self)
    770         with SCCallSiteSync(self.context) as css:
    771             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
--> 772         return list(_load_from_socket(port, self._jrdd_deserializer))
    773 
    774     def reduce(self, f):

/local/adhoc/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
    140     try:
    141         rf = sock.makefile("rb", 65536)
--> 142         for item in serializer.load_stream(rf):
    143             yield item
    144     finally:

/local/adhoc/spark/python/pyspark/serializers.py in load_stream(self, stream)
    137         while True:
    138             try:
--> 139                 yield self._read_with_length(stream)
    140             except EOFError:
    141                 return

/local/adhoc/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
    162         if len(obj) < length:
    163             raise EOFError
--> 164         return self.loads(obj)
    165 
    166     def dumps(self, obj):

/local/adhoc/spark/python/pyspark/serializers.py in loads(self, obj, encoding)
    417     if sys.version >= '3':
    418         def loads(self, obj, encoding="bytes"):
--> 419             return pickle.loads(obj, encoding=encoding)
    420     else:
    421         def loads(self, obj, encoding=None):

ImportError: No module named 'pandas.indexes'

Any workaround as I cannot change any other code / lib version? I saw this https://stackoverflow.com/questions/37371451/importerror-no-module-named-pandas-indexes but I cannot apply it.

Thanks!

@SuvroBaner
Copy link

Hi, I get this error when I try to convert a large spark dataframe to Pandas dataframe-

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
traceback:
[
"Traceback (most recent call last):\n",
"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:933)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n\n"
]

@maddurup
Copy link

I am getting the below error:

Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579817590870380218.py", line 367, in
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579817590870380218.py", line 360, in
exec(code, _zcUserQueryNameSpace)
File "", line 1, in
File "", line 14, in toPandas
File "/usr/lib/spark/python/pyspark/rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 11.0 failed 4 times, most recent failure: Lost task 10.3 in stage 11.0 (TID 870, ip-172-31-33-241.us-west-2.compute.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/cloudpickle.py", line 711, in subimport
import(name)
ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/cloudpickle.py", line 711, in subimport
import(name)
ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

@emir-munoz
Copy link

@maddurup seems you do not have pandas installed in your environment

ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))

@gravityrahul
Copy link

I am getting this mismatch on length


ValueError Traceback (most recent call last)
in ()
----> 1 testPrePeriodDataWeekPandas=toPandas(testPrePeriodDataWeek, n_partitions=1000)

in toPandas(df, n_partitions)
16 df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
17 df_pand = pd.concat(df_pand)
---> 18 df_pand.columns = df.columns
19 return df_pand

~/.anaconda/lib/python3.7/site-packages/pandas/core/generic.py in setattr(self, name, value)
4387 try:
4388 object.getattribute(self, name)
-> 4389 return object.setattr(self, name, value)
4390 except AttributeError:
4391 pass

pandas/_libs/properties.pyx in pandas._libs.properties.AxisProperty.set()

~/.anaconda/lib/python3.7/site-packages/pandas/core/generic.py in _set_axis(self, axis, labels)
644
645 def _set_axis(self, axis, labels):
--> 646 self._data.set_axis(axis, labels)
647 self._clear_item_cache()
648

~/.anaconda/lib/python3.7/site-packages/pandas/core/internals.py in set_axis(self, axis, new_labels)
3321 raise ValueError(
3322 'Length mismatch: Expected axis has {old} elements, new '
-> 3323 'values have {new} elements'.format(old=old_len, new=new_len))
3324
3325 self.axes[axis] = new_labels

ValueError: Length mismatch: Expected axis has 0 elements, new values have 9 elements

@kmanojus
Copy link

Do we have solution available for ValueError: Length mismatch: Expected axis has 0 elements, new values have 9 elements?

@mvcduarte
Copy link

Hi, I get this error when I try to convert a large spark dataframe to Pandas dataframe-

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
traceback:
[
"Traceback (most recent call last):\n",
"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:933)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n\n"
]

I had the same problem but no idea how to solve it.

@Stanpol
Copy link

Stanpol commented May 3, 2019

If you use Spark 2.3.* just turn on Arrow spark.conf.set("spark.sql.execution.arrow.enabled", "true") and it will significantly speed up toPandas method.

@cerlymarco
Copy link

don't forget reset_index(drop=True) of your pandas dataframe

@gabriellm1
Copy link

thank you very much for this solution!

@sagargoyal96
Copy link

I tried all the above mentioned optimisations and still keep running into memory issues. I even put the new partitions as big number.
I get the following error:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5450 tasks (4.0 GB) is bigger than spark.driver.maxResultSize (4.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:997)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
at org.apache.spark.rdd.RDD.collect(RDD.scala:996)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:247)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)

@patebel
Copy link

patebel commented Nov 25, 2020

I tried all the above mentioned optimisations and still keep running into memory issues. I even put the new partitions as big number.
I get the following error:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5450 tasks (4.0 GB) is bigger than spark.driver.maxResultSize (4.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:997)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
at org.apache.spark.rdd.RDD.collect(RDD.scala:996)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:247)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)

You can increase your spark.driver.maxResultSize in your Spark config. It seems like you df is larger than the four gigabyte. Set it to 15g or something similar and try to run it again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment