Skip to content

Instantly share code, notes, and snippets.

@BryanCutler
Last active September 16, 2020 02:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save BryanCutler/bc73d573b7e46a984ff8b6edf228e298 to your computer and use it in GitHub Desktop.
Save BryanCutler/bc73d573b7e46a984ff8b6edf228e298 to your computer and use it in GitHub Desktop.
How to create a Spark DataFrame from Pandas or NumPy with Arrow
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# How to Create a Spark DataFrame efficiently from Pandas using Arrow\n",
"This notebook will demonstrate how to enable Arrow to quickly and efficiently create a Spark DataFrame from an existing Pandas DataFrame.\n",
"\n",
"The following assumes you have started a Jupyter Notebook with a PySpark kernel that creates a default SparkSession `spark`. For a quick method to start this up, see [here](https://gist.github.com/BryanCutler/b7f10167c4face19e03330a07b24ce21).\n",
"\n",
"\n",
"## Generate a Pandas DataFrame\n",
"First let's make a function to generate sample data from NumPy and wrap it in a Pandas DataFrame. The function will take an integer `num_records` and create a 2D array of doubles that translates to a DataFrame of 10 columns by `num_records` rows."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"\n",
"def gen_pdf(num_records):\n",
" return pd.DataFrame(np.random.rand(num_records, 10), columns=list(\"abcdefghij\"))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Without Arrow, Life is Painful!\n",
"Lets first try to create a DataFrame without Arrow, but to avoid too much waiting around we will only use 100,000 records and time 1 call to create the DataFrame (this takes ~6-7s running local on my laptop)."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 loop, best of 1: 6.92 s per loop\n"
]
}
],
"source": [
"spark.conf.set(\"spark.sql.execution.arrow.enabled\", \"false\")\n",
"\n",
"pdf = gen_pdf(100000)\n",
"\n",
"%timeit -r1 spark.createDataFrame(pdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This really wasn't much data, but is still extremely slow! Mostly because Spark must iterate through each row of data and do type checking and conversions from Python to Java for each value, which in tern furces Numpy to convert data to plain Python objects and serialize these to the JVM."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enable Arrow with a Spark Conf\n",
"Now enable Arrow, this can also be done by adding as a line in `spark-defaults.conf`"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"spark.conf.set(\"spark.sql.execution.arrow.enabled\", \"true\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create the Spark DataFrame using Arrow\n",
"With Arrow enabled, let's increase the size of the DataFrame to 1,000,000 records (not to worry, it should complete in the sub-second range)."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 loop, best of 3: 249 ms per loop\n"
]
}
],
"source": [
"pdf = gen_pdf(1000000)\n",
"\n",
"%timeit spark.createDataFrame(pdf)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That's more like it! Even with a lot more data, it is still faster by a huge factor. Arrow allows the Numpy data to be sent to the JVM in batches where it can be directly consumed without doing a bunch of conversions while still ensuring accurate type info.\n",
"\n",
"Just to be sure nothing fishy is going on, we can take a look at the data and make sure it checks out."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------------+\n",
"|summary| a|\n",
"+-------+--------------------+\n",
"| count| 1000000|\n",
"| mean| 0.5001517817905997|\n",
"| stddev| 0.28866041226417954|\n",
"| min|2.761536613493653...|\n",
"| 25%| 0.250523879866264|\n",
"| 50%| 0.5004081127153109|\n",
"| 75%| 0.7499817669739669|\n",
"| max| 0.9999999361166477|\n",
"+-------+--------------------+\n",
"\n"
]
}
],
"source": [
"spark.createDataFrame(pdf) \\\n",
" .select(\"a\").summary().show()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.14"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@aschmu
Copy link

aschmu commented Jul 9, 2019

Hi ! there seems to be some some missing imports !
e.g where does the spark object comme from ?
otherwise nice gist !

@BryanCutler
Copy link
Author

Thanks @aschmu! The variable spark is a default SparkSession. I forgot to mention you should be running Jupyter with a PySpark kernel. I put a sample script on how I do this here https://gist.github.com/BryanCutler/b7f10167c4face19e03330a07b24ce21 in case it could be of help. Thanks for the feedback!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment