Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
PySpark vectorized UDFs with Arrow
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# PySpark Vectorized UDFs using Arrow\n",
"Using Arrow, it is possible to perform vectorized evaluation of Python UDFs that will accept one or more `Pandas.Series` as input and return a single `Pandas.Series` of equal length. Using vectorized functions will offer a performance boost over the current way PySpark evaluates using a loop that iterates over 1 row at a time.\n",
"\n",
"The following assumes you have started a Jupyter Notebook with a PySpark kernel that creates a default SparkSession `spark`. For a quick method to start this up, see [here](https://gist.github.com/BryanCutler/b7f10167c4face19e03330a07b24ce21).\n",
"\n",
"\n",
"## Where to get it\n",
"This functionality is currently pending review and has not yet been merged into Spark, see [SPARK-21404](https://issues.apache.org/jira/browse/SPARK-21404). Until then, a patch for this can be downloaded from the branch in the PR [here](https://patch-diff.githubusercontent.com/raw/apache/spark/pull/18659.diff)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## PySpark API\n",
"A new API has been added in pyspark to declare a vectorized UDF. As with normal UDFs you can wrap a function or use a decorator:\n",
"\n",
"```python\n",
"# Wrap the function \"func\"\n",
"pandas_udf(func, DoubleType())\n",
"\n",
"# Use a decorator\n",
"@pandas(returnType=DoubleType())\n",
"def func(x):\n",
" # do something with \"x\" (pandas.Series) and return \"y\" (also a pandas.Series)\n",
" return y\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Example Usage\n",
"Let's go through a simple example with first evaluating a UDF without vectorization, then the same UDF with vectorization enabled. This will define some sample data:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from pyspark.sql.functions import col, udf, mean, rand\n",
"from pyspark.sql.types import *\n",
"\n",
"df = spark.range(1 << 24, numPartitions=16).toDF(\"id\") \\\n",
" .withColumn(\"p1\", rand()).withColumn(\"p2\", rand())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### First define the function *without vectorization*"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from math import log, exp\n",
"\n",
"def my_func(p1, p2):\n",
" w = 0.5\n",
" return exp(log(p1) + log(p2) - log(w))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"and evaluate it as a UDF (using `filter()` to force evaluation)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 loop, best of 3: 13.6 s per loop\n"
]
}
],
"source": [
"my_udf = udf(my_func, DoubleType())\n",
"\n",
"result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n",
"\n",
"%timeit result.filter(\"p < 1.0\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Now define the function *with vectorization*"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from numpy import log, exp\n",
"\n",
"def my_func(p1, p2):\n",
" w = 0.5\n",
" return exp(log(p1) + log(p2) - log(w))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"and evaluate the UDF again, this time making use of Arrow to evaluate `my_func` with `p1` and `p2` as `Pandas.Series`, which will also cause the expression to return a `Pandas.Series` of the same size.\n",
"\n",
"NOTE: Spark will not accept Numpy types as return values, which is why we need to redefine the function. This is an known issue from [SPARK-12157](https://issues.apache.org/jira/browse/SPARK-12157)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1 loop, best of 3: 2 s per loop\n"
]
}
],
"source": [
"from pyspark.sql.functions import pandas_udf\n",
"\n",
"my_udf = pandas_udf(my_func, DoubleType())\n",
"\n",
"result = df.withColumn(\"p\", my_udf(col(\"p1\"), col(\"p2\")))\n",
"\n",
"%timeit result.filter(\"p < 1.0\").count()"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"## Make better use of Pandas and Numpy\n",
"\n",
"Since the inputs to your UDF are `Pandas.Series`, you can use Pandas and Numpy operations on the data and also return a series or numpy array. For example, say we want to draw samples from a random distribution for data points with a specific label."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"scrolled": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+------+\n",
"|label|counts|\n",
"+-----+------+\n",
"| 0| 7.0|\n",
"| 1| 0.0|\n",
"| 2| 0.0|\n",
"| 0| 8.0|\n",
"| 1| 0.0|\n",
"| 2| 0.0|\n",
"| 0| 7.0|\n",
"+-----+------+\n",
"only showing top 7 rows\n",
"\n"
]
}
],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"df = spark.range(1 << 20).toDF(\"id\") \\\n",
" .selectExpr(\"(id % 3) AS label\")\n",
"\n",
"def sample(label):\n",
" \"\"\" \n",
" Sample selected data from a Poisson distribution\n",
" :param label: Pandas.Series of data labels\n",
" \"\"\"\n",
"\n",
" # use numpy to initialze an empty array\n",
" p = pd.Series(np.zeros(len(label)))\n",
"\n",
" # use pandas to select data matching label \"0\"\n",
" idx0 = label == 0\n",
"\n",
" # sample from numpy and assign to the selected data\n",
" p[idx0] = np.random.poisson(7, len(idx0))\n",
"\n",
" # return the pandas series\n",
" return p\n",
"\n",
"sample_udf = pandas_udf(sample, DoubleType())\n",
"\n",
"result = df.withColumn(\"counts\", sample_udf(col(\"label\")))\n",
"result.show(n=7)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@davoscollective

This comment has been minimized.

Copy link

davoscollective commented Apr 19, 2018

Looks like the Jira ticket is closed and this is part of Spark 2.3, thanks!

@rajatdeshpande

This comment has been minimized.

Copy link

rajatdeshpande commented Oct 17, 2018

Hi Bryan,

I have been able to make pandas_udf's work using an ipython notebook, but this does not work with spark-submit

from pyspark.sql.functions import PandasUDFType
@pandas_udf("id int",PandasUDFType.GROUPED_MAP)
def featurize_udf(dfp):
return pd.DataFrame({'id':[1,2]})

gdf = df.groupBy('username').apply(featurize_udf)

Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:333)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:322)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:177)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:121)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:158)
... 24 more

is this ready for production?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.