Skip to content

Instantly share code, notes, and snippets.

@BryanCutler
Last active January 24, 2019 11:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save BryanCutler/4dac13a5b446b0906ec2c4fc6f8deb5c to your computer and use it in GitHub Desktop.
Save BryanCutler/4dac13a5b446b0906ec2c4fc6f8deb5c to your computer and use it in GitHub Desktop.
Spark to Pandas Conversion with Arrow Example
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create a Sample DataFrame in Spark\n",
"To generate some sample data, we will make a DataFrame with 2 columns: 1 long and 1 double and 4,194,304 records"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- id: long (nullable = false)\n",
" |-- x: double (nullable = false)\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import rand\n",
"df = spark.range(1 << 22).toDF(\"id\").withColumn(\"x\", rand())\n",
"df.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Conversion to a Pandas DataFrame without Arrow\n",
"\n",
"This uses the default Spark serializers to transfer the data and load it into Pandas 1 record at a time. It's a very inefficient process due to the high overhead of serialization and having to process individual scalar values."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 17.1 s, sys: 552 ms, total: 17.6 s\n",
"Wall time: 20.5 s\n"
]
}
],
"source": [
"%time pdf = df.toPandas()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enable Arrow with a Spark property\n",
"By default, Arrow is not enabled in Spark. You can enable by setting the following SQLConf or adding \"spark.sql.execution.arrow.enabled=true\" to your Spark configuration at `conf/spark-defaults.conf`"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"spark.conf.set(\"spark.sql.execution.arrow.enabled\", \"true\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run the conversion again, this time with Arrow\n",
"With Arrow enabled, the call `toPandas()` is optimized to use Arrow to transfer the data and avoid serialization costs. Arrow can then utilize zero-copy methods to produce a Pandas DataFrame on chunks of data at a time, making the entire process very efficient."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 20 ms, sys: 56 ms, total: 76 ms\n",
"Wall time: 692 ms\n"
]
}
],
"source": [
"%time pdf = df.toPandas()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment