Skip to content

Instantly share code, notes, and snippets.

@lucidyan
Forked from joshlk/faster_toPandas.py
Last active October 13, 2024 03:04
Show Gist options
  • Save lucidyan/1e5d9e490a101cdc1c2ed901568e082b to your computer and use it in GitHub Desktop.
Save lucidyan/1e5d9e490a101cdc1c2ed901568e082b to your computer and use it in GitHub Desktop.
fastest pyspark DataFrame to pandas DataFrame conversion using mapPartitions
import pandas as pd
from pyspark.sql import DataFrame
# Wrapper for seamless Spark's serialisation
def spark_to_pandas(spark_df: DataFrame) -> pd.DataFrame:
"""
PySpark toPandas realisation using mapPartitions
much faster than vanilla version
fork: https://gist.github.com/lucidyan/1e5d9e490a101cdc1c2ed901568e082b
origin: https://gist.github.com/joshlk/871d58e01417478176e7
:param spark_df:
:return:
"""
def _map_to_pandas(rdds) -> list:
""" Needs to be here due to pickling issues """
return [pd.DataFrame(list(rdds))]
def _to_pandas(df: DataFrame, n_partitions: int = None) -> pd.DataFrame:
"""
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is
repartitioned if `n_partitions` is passed.
:param df:
:param n_partitions:
:return:
"""
if n_partitions is not None:
df = df.repartition(n_partitions)
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() # type: pd.DataFrame
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
return _to_pandas(spark_df)
@nyou70
Copy link

nyou70 commented Jul 12, 2021

I am getting this error, using your code for converting the dataframes:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 65 tasks (1034.9 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment