-
-
Save lucidyan/1e5d9e490a101cdc1c2ed901568e082b to your computer and use it in GitHub Desktop.
fastest pyspark DataFrame to pandas DataFrame conversion using mapPartitions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from pyspark.sql import DataFrame | |
# Wrapper for seamless Spark's serialisation | |
def spark_to_pandas(spark_df: DataFrame) -> pd.DataFrame: | |
""" | |
PySpark toPandas realisation using mapPartitions | |
much faster than vanilla version | |
fork: https://gist.github.com/lucidyan/1e5d9e490a101cdc1c2ed901568e082b | |
origin: https://gist.github.com/joshlk/871d58e01417478176e7 | |
:param spark_df: | |
:return: | |
""" | |
def _map_to_pandas(rdds) -> list: | |
""" Needs to be here due to pickling issues """ | |
return [pd.DataFrame(list(rdds))] | |
def _to_pandas(df: DataFrame, n_partitions: int = None) -> pd.DataFrame: | |
""" | |
Returns the contents of `df` as a local `pandas.DataFrame` in a speedy fashion. The DataFrame is | |
repartitioned if `n_partitions` is passed. | |
:param df: | |
:param n_partitions: | |
:return: | |
""" | |
if n_partitions is not None: | |
df = df.repartition(n_partitions) | |
df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() # type: pd.DataFrame | |
df_pand = pd.concat(df_pand) | |
df_pand.columns = df.columns | |
return df_pand | |
return _to_pandas(spark_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I am getting this error, using your code for converting the dataframes:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 65 tasks (1034.9 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)