Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
PySpark faster toPandas using mapPartitions
import pandas as pd
def _map_to_pandas(rdds):
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]
def toPandas(df, n_partitions=None):
"""
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
repartitioned if `n_partitions` is passed.
:param df: pyspark.sql.DataFrame
:param n_partitions: int or None
:return: pandas.DataFrame
"""
if n_partitions is not None: df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
@ghost

This comment has been minimized.

Copy link

ghost commented Apr 9, 2016

Not only faster but fixed a job-crashing out-of-memory issue for me! Much appreciated!

@gopalatumuluri

This comment has been minimized.

Copy link

gopalatumuluri commented Oct 13, 2016

I am running into the memory problem. This works on about 500,000 rows, but runs out of memory with anything larger. I am partitioning the spark data frame by two columns, and then converting 'toPandas(df)' using above. Any ideas on best way to use this? I want each individual partition to be a pandas data frame.

@joshlk

This comment has been minimized.

Copy link
Owner Author

joshlk commented Oct 25, 2016

Its likely that your partitions are too big for each executor. You probably need to increase the number of partitions before using toPandas.

@cs4026

This comment has been minimized.

Copy link

cs4026 commented Jun 15, 2017

Hey Josh,

I was looking to use the code to create a pandas data frame from a pyspark data frame of 10mil+ records. I saved the above code to a file (faster_toPandas.py) and attempted to import this into my main program. While attempting to call the toPandas() function on my Pyspark dataframe, I kept receiving an Import Error: Module "faster_toPandas" not found. It appears that pickle.loads() was the last call made before an error was thrown. Do you have any ideas on what may be the cause of this error?

Any help would be greatly appreciated!

@NegatioN

This comment has been minimized.

Copy link

NegatioN commented Feb 19, 2018

Why isn't this the default implementation? Are there any drawbacks here that I'm not seeing?
Great gist btw, really appreciate it.

@kiranreddyrebel

This comment has been minimized.

Copy link

kiranreddyrebel commented Feb 28, 2018

thanx , it's a great way of using toPandas() func.

@dgouju

This comment has been minimized.

Copy link

dgouju commented Apr 6, 2018

I was using this great code with pandas 0.18.1, but now with 0.21.1 it fails this way:

<ipython-input-5-63082a0c1bfc> in toPandas(df, n_partitions)
     30     """
     31     if n_partitions is not None: df = df.repartition(n_partitions)
---> 32     df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
     33     df_pand = pd.concat(df_pand)
     34     df_pand.columns = df.columns

/local/adhoc/spark/python/pyspark/rdd.py in collect(self)
    770         with SCCallSiteSync(self.context) as css:
    771             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
--> 772         return list(_load_from_socket(port, self._jrdd_deserializer))
    773 
    774     def reduce(self, f):

/local/adhoc/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
    140     try:
    141         rf = sock.makefile("rb", 65536)
--> 142         for item in serializer.load_stream(rf):
    143             yield item
    144     finally:

/local/adhoc/spark/python/pyspark/serializers.py in load_stream(self, stream)
    137         while True:
    138             try:
--> 139                 yield self._read_with_length(stream)
    140             except EOFError:
    141                 return

/local/adhoc/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
    162         if len(obj) < length:
    163             raise EOFError
--> 164         return self.loads(obj)
    165 
    166     def dumps(self, obj):

/local/adhoc/spark/python/pyspark/serializers.py in loads(self, obj, encoding)
    417     if sys.version >= '3':
    418         def loads(self, obj, encoding="bytes"):
--> 419             return pickle.loads(obj, encoding=encoding)
    420     else:
    421         def loads(self, obj, encoding=None):

ImportError: No module named 'pandas.indexes'

Any workaround as I cannot change any other code / lib version? I saw this https://stackoverflow.com/questions/37371451/importerror-no-module-named-pandas-indexes but I cannot apply it.

Thanks!

@SuvroBaner

This comment has been minimized.

Copy link

SuvroBaner commented Apr 19, 2018

Hi, I get this error when I try to convert a large spark dataframe to Pandas dataframe-

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
traceback:
[
"Traceback (most recent call last):\n",
"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:933)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n\n"
]

@maddurup

This comment has been minimized.

Copy link

maddurup commented Nov 13, 2018

I am getting the below error:

Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579817590870380218.py", line 367, in
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-8579817590870380218.py", line 360, in
exec(code, _zcUserQueryNameSpace)
File "", line 1, in
File "", line 14, in toPandas
File "/usr/lib/spark/python/pyspark/rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 11.0 failed 4 times, most recent failure: Lost task 10.3 in stage 11.0 (TID 870, ip-172-31-33-241.us-west-2.compute.internal, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/cloudpickle.py", line 711, in subimport
import(name)
ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1678)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1677)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:855)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1905)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:458)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 166, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
File "/mnt/yarn/usercache/zeppelin/appcache/application_1542066116522_0002/container_1542066116522_0002_01_000010/pyspark.zip/pyspark/cloudpickle.py", line 711, in subimport
import(name)
ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

@emir-munoz

This comment has been minimized.

Copy link

emir-munoz commented Nov 16, 2018

@maddurup seems you do not have pandas installed in your environment

ImportError: ('No module named pandas', <function subimport at 0x7fa2ee31ec80>, ('pandas',))

@gravityrahul

This comment has been minimized.

Copy link

gravityrahul commented Dec 3, 2018

I am getting this mismatch on length


ValueError Traceback (most recent call last)
in ()
----> 1 testPrePeriodDataWeekPandas=toPandas(testPrePeriodDataWeek, n_partitions=1000)

in toPandas(df, n_partitions)
16 df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
17 df_pand = pd.concat(df_pand)
---> 18 df_pand.columns = df.columns
19 return df_pand

~/.anaconda/lib/python3.7/site-packages/pandas/core/generic.py in setattr(self, name, value)
4387 try:
4388 object.getattribute(self, name)
-> 4389 return object.setattr(self, name, value)
4390 except AttributeError:
4391 pass

pandas/_libs/properties.pyx in pandas._libs.properties.AxisProperty.set()

~/.anaconda/lib/python3.7/site-packages/pandas/core/generic.py in _set_axis(self, axis, labels)
644
645 def _set_axis(self, axis, labels):
--> 646 self._data.set_axis(axis, labels)
647 self._clear_item_cache()
648

~/.anaconda/lib/python3.7/site-packages/pandas/core/internals.py in set_axis(self, axis, new_labels)
3321 raise ValueError(
3322 'Length mismatch: Expected axis has {old} elements, new '
-> 3323 'values have {new} elements'.format(old=old_len, new=new_len))
3324
3325 self.axes[axis] = new_labels

ValueError: Length mismatch: Expected axis has 0 elements, new values have 9 elements

@kmanojus

This comment has been minimized.

Copy link

kmanojus commented Mar 20, 2019

Do we have solution available for ValueError: Length mismatch: Expected axis has 0 elements, new values have 9 elements?

@mvcduarte

This comment has been minimized.

Copy link

mvcduarte commented Mar 22, 2019

Hi, I get this error when I try to convert a large spark dataframe to Pandas dataframe-

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
traceback:
[
"Traceback (most recent call last):\n",
"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:933)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n\n"
]

I had the same problem but no idea how to solve it.

@Stanpol

This comment has been minimized.

Copy link

Stanpol commented May 3, 2019

If you use Spark 2.3.* just turn on Arrow spark.conf.set("spark.sql.execution.arrow.enabled", "true") and it will significantly speed up toPandas method.

@cerlymarco

This comment has been minimized.

Copy link

cerlymarco commented Sep 25, 2019

don't forget reset_index(drop=True) of your pandas dataframe

@gabriellm1

This comment has been minimized.

Copy link

gabriellm1 commented Nov 15, 2019

thank you very much for this solution!

@sagargoyal96

This comment has been minimized.

Copy link

sagargoyal96 commented Apr 17, 2020

I tried all the above mentioned optimisations and still keep running into memory issues. I even put the new partitions as big number.
I get the following error:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5450 tasks (4.0 GB) is bigger than spark.driver.maxResultSize (4.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:997)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
at org.apache.spark.rdd.RDD.collect(RDD.scala:996)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:247)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.