-
-
Save joshlk/871d58e01417478176e7 to your computer and use it in GitHub Desktop.
import pandas as pd | |
def _map_to_pandas(rdds): | |
""" Needs to be here due to pickling issues """ | |
return [pd.DataFrame(list(rdds))] | |
def toPandas(df, n_partitions=None): | |
""" | |
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is | |
repartitioned if `n_partitions` is passed. | |
:param df: pyspark.sql.DataFrame | |
:param n_partitions: int or None | |
:return: pandas.DataFrame | |
""" | |
if n_partitions is not None: df = df.repartition(n_partitions) | |
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() | |
df_pand = pd.concat(df_pand) | |
df_pand.columns = df.columns | |
return df_pand |
Hi, I get this error when I try to convert a large spark dataframe to Pandas dataframe-
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)
at org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)
at scala.Option.map(Option.scala:145)
at org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)
at org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)
at org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
traceback:
[
"Traceback (most recent call last):\n",
"Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 51 in stage 3.0 failed 4 times, most recent failure: Lost task 51.3 in stage 3.0 (TID 69, ip-172-31-63-125.us-west-2.compute.internal): java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:323)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:933)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException\n\tat org.apache.spark.sql.hive.HiveInspectors$class.unwrapperFor(HiveInspectors.scala:488)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.unwrapperFor(OrcRelation.scala:244)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1$$anonfun$6.apply(OrcRelation.scala:275)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)\n\tat scala.collection.immutable.List.foreach(List.scala:318)\n\tat scala.collection.TraversableLike$class.map(TraversableLike.scala:244)\n\tat scala.collection.AbstractTraversable.map(Traversable.scala:105)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:275)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject$1.apply(OrcRelation.scala:270)\n\tat scala.Option.map(Option.scala:145)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan.org$apache$spark$sql$hive$orc$OrcTableScan$$fillObject(OrcRelation.scala:270)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:325)\n\tat org.apache.spark.sql.hive.orc.OrcTableScan$$anonfun$execute$1.apply(OrcRelation.scala:323)\n\tat org.apache.spark.rdd.HadoopRDD$HadoopMapPartitionsWithSplitRDD.compute(HadoopRDD.scala:386)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:277)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n\n"
]
I had the same problem but no idea how to solve it.
If you use Spark 2.3.* just turn on Arrow spark.conf.set("spark.sql.execution.arrow.enabled", "true")
and it will significantly speed up toPandas method.
don't forget reset_index(drop=True)
of your pandas dataframe
thank you very much for this solution!
I tried all the above mentioned optimisations and still keep running into memory issues. I even put the new partitions as big number.
I get the following error:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5450 tasks (4.0 GB) is bigger than spark.driver.maxResultSize (4.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:997)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
at org.apache.spark.rdd.RDD.collect(RDD.scala:996)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:247)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
I tried all the above mentioned optimisations and still keep running into memory issues. I even put the new partitions as big number.
I get the following error:An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 5450 tasks (4.0 GB) is bigger than spark.driver.maxResultSize (4.0 GB)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2302)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2346)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:997)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:392)
at org.apache.spark.rdd.RDD.collect(RDD.scala:996)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:247)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
You can increase your spark.driver.maxResultSize in your Spark config. It seems like you df is larger than the four gigabyte. Set it to 15g or something similar and try to run it again.
Do we have solution available for ValueError: Length mismatch: Expected axis has 0 elements, new values have 9 elements?