Skip to content

Instantly share code, notes, and snippets.

@BryanCutler
Last active February 17, 2022 13:57
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save BryanCutler/0b0c820c1beb5ffc40618c462912195f to your computer and use it in GitHub Desktop.
Save BryanCutler/0b0c820c1beb5ffc40618c462912195f to your computer and use it in GitHub Desktop.
PySpark vectorized UDFs with Arrow
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PySpark Vectorized UDFs using Arrow\n",
"Using Arrow, it is possible to perform vectorized evaluation of Python UDFs that will accept one or more `Pandas.Series` as input and return a single `Pandas.Series` of equal length. Using vectorized functions will offer a performance boost over the current way PySpark evaluates using a loop that iterates over 1 row at a time.\n",
"\n",
"The following assumes you have started a Jupyter Notebook with a PySpark kernel that creates a default SparkSession `spark`. For a quick method to start this up, see [here](https://gist.github.com/BryanCutler/b7f10167c4face19e03330a07b24ce21).\n",
"\n",
"\n",
"## Where to get it\n",
"This functionality is currently pending review and has not yet been merged into Spark, see [SPARK-21404](https://issues.apache.org/jira/browse/SPARK-21404). Until then, a patch for this can be downloaded from the branch in the PR [here](https://patch-diff.githubusercontent.com/raw/apache/spark/pull/18659.diff)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## PySpark API\n",
"A new API has been added in pyspark to declare a vectorized UDF. As with normal UDFs you can wrap a function or use a decorator:\n",
"\n",
"```python\n",
"# Wrap the function \"func\"\n",
"pandas_udf(func, DoubleType())\n",
"\n",
"# Use a decorator\n",
"@pandas(returnType=DoubleType())\n",
"def func(x):\n",
" # do something with \"x\" (pandas.Series) and return \"y\" (also a pandas.Series)\n",
" return y\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example Usage\n",
"Let's go through a simple example with first evaluating a UDF without vectorization, then the same UDF with vectorization enabled. This will define some sample data:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql.functions import col, udf, mean, rand\n",
"from pyspark.sql.types import *\n",
"\n",
"df = spark.range(1 << 24, numPartitions=16).toDF(\"id\") \\\n",
" .withColumn(\"p1\", rand()).withColumn(\"p2\", rand())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First define the function *without vectorization*"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from math import log, exp\n",
"\n",
"def my_func(p1, p2):\n",
" w = 0.5\n",
" return exp(log(p1) + log(p2) - log(w))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"and evaluate it as a UDF (using `filter()` to force evaluation)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 loop, best of 3: 13.6 s per loop\n"
]
}
],
"source": [
"my_udf = udf(my_func, DoubleType())\n",
"\n",
"result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n",
"\n",
"%timeit result.filter(\"p < 1.0\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Now define the function *with vectorization*"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from numpy import log, exp\n",
"\n",
"def my_func(p1, p2):\n",
" w = 0.5\n",
" return exp(log(p1) + log(p2) - log(w))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"and evaluate the UDF again, this time making use of Arrow to evaluate `my_func` with `p1` and `p2` as `Pandas.Series`, which will also cause the expression to return a `Pandas.Series` of the same size.\n",
"\n",
"NOTE: Spark will not accept Numpy types as return values, which is why we need to redefine the function. This is an known issue from [SPARK-12157](https://issues.apache.org/jira/browse/SPARK-12157)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 loop, best of 3: 2 s per loop\n"
]
}
],
"source": [
"from pyspark.sql.functions import pandas_udf\n",
"\n",
"my_udf = pandas_udf(my_func, DoubleType())\n",
"\n",
"result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n",
"\n",
"%timeit result.filter(\"p < 1.0\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"## Make better use of Pandas and Numpy\n",
"\n",
"Since the inputs to your UDF are `Pandas.Series`, you can use Pandas and Numpy operations on the data and also return a series or numpy array. For example, say we want to draw samples from a random distribution for data points with a specific label."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+------+\n",
"|label|counts|\n",
"+-----+------+\n",
"| 0| 7.0|\n",
"| 1| 0.0|\n",
"| 2| 0.0|\n",
"| 0| 8.0|\n",
"| 1| 0.0|\n",
"| 2| 0.0|\n",
"| 0| 7.0|\n",
"+-----+------+\n",
"only showing top 7 rows\n",
"\n"
]
}
],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"df = spark.range(1 << 20).toDF(\"id\") \\\n",
" .selectExpr(\"(id % 3) AS label\")\n",
"\n",
"def sample(label):\n",
" \"\"\" \n",
" Sample selected data from a Poisson distribution\n",
" :param label: Pandas.Series of data labels\n",
" \"\"\"\n",
"\n",
" # use numpy to initialze an empty array\n",
" p = pd.Series(np.zeros(len(label)))\n",
"\n",
" # use pandas to select data matching label \"0\"\n",
" idx0 = label == 0\n",
"\n",
" # sample from numpy and assign to the selected data\n",
" p[idx0] = np.random.poisson(7, len(idx0))\n",
"\n",
" # return the pandas series\n",
" return p\n",
"\n",
"sample_udf = pandas_udf(sample, DoubleType())\n",
"\n",
"result = df.withColumn(\"counts\", sample_udf(col(\"label\")))\n",
"result.show(n=7)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Copy link

ghost commented Apr 19, 2018

Looks like the Jira ticket is closed and this is part of Spark 2.3, thanks!

@rajatdeshpande
Copy link

rajatdeshpande commented Oct 17, 2018

Hi Bryan,

I have been able to make pandas_udf's work using an ipython notebook, but this does not work with spark-submit

from pyspark.sql.functions import PandasUDFType
@pandas_udf("id int",PandasUDFType.GROUPED_MAP)
def featurize_udf(dfp):
return pd.DataFrame({'id':[1,2]})

gdf = df.groupBy('username').apply(featurize_udf)

Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:333)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:322)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:177)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:158)
... 24 more

is this ready for production?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment