Skip to content

Instantly share code, notes, and snippets.

@kittipatkampa
Last active December 1, 2018 22:45
Show Gist options
  • Save kittipatkampa/49d395176a478d7b8c9cb0e7767be3c6 to your computer and use it in GitHub Desktop.
Save kittipatkampa/49d395176a478d7b8c9cb0e7767be3c6 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import numpy as np\n",
"\n",
"from pyspark.sql import SparkSession\n",
"import pyspark.sql.functions as F\n",
"from pyspark.sql.functions import col, when, lit, coalesce\n",
"from pyspark.sql.types import IntegerType, DoubleType, ArrayType, FloatType, MapType, StringType"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"columns = ['id', 'a', 'b', 'c']\n",
"vals = [\n",
" (1, {25.0: 1, 50.0: 2}, [1,2,3], ['A','B','C']),\n",
" (2, {25.0: 1, 50.0: 2}, [4,5,None], ['A',None,'B','C']),\n",
" (3, None, None, None),\n",
" (4, {25.0: 1, 50.0: 2}, [], []),\n",
" (5, {25.0: 1, 50.0: 2}, [None], [None]), \n",
" (6, {25.0: 1, 50.0: 2}, [4], ['A']),\n",
" (7, {25.0: 1, 50.0: 2}, [4,None,None], []), \n",
" (8, {25.0: 1, 50.0: 2}, [4,np.nan], [])\n",
"]\n",
"\n",
"# create DataFrame\n",
"df = spark.createDataFrame(vals, columns)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------------------+---------+----------+\n",
"| id| a| b| c|\n",
"+---+--------------------+---------+----------+\n",
"| 1|[50.0 -> 2, 25.0 ...|[1, 2, 3]| [A, B, C]|\n",
"| 2|[50.0 -> 2, 25.0 ...| [4, 5,]|[A,, B, C]|\n",
"| 3| null| null| null|\n",
"| 4|[50.0 -> 2, 25.0 ...| []| []|\n",
"| 5|[50.0 -> 2, 25.0 ...| []| []|\n",
"| 6|[50.0 -> 2, 25.0 ...| [4]| [A]|\n",
"| 7|[50.0 -> 2, 25.0 ...| [4,,]| []|\n",
"| 8|[50.0 -> 2, 25.0 ...| [4,]| []|\n",
"+---+--------------------+---------+----------+\n",
"\n"
]
}
],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-RECORD 0---------------------\n",
" id | 1 \n",
" a | [50.0 -> 2, 25.0 -> 1] \n",
" b | [1, 2, 3] \n",
" c | [A, B, C] \n",
"-RECORD 1---------------------\n",
" id | 2 \n",
" a | [50.0 -> 2, 25.0 -> 1] \n",
" b | [4, 5,] \n",
" c | [A,, B, C] \n",
"-RECORD 2---------------------\n",
" id | 3 \n",
" a | null \n",
" b | null \n",
" c | null \n",
"-RECORD 3---------------------\n",
" id | 4 \n",
" a | [50.0 -> 2, 25.0 -> 1] \n",
" b | [] \n",
" c | [] \n",
"-RECORD 4---------------------\n",
" id | 5 \n",
" a | [50.0 -> 2, 25.0 -> 1] \n",
" b | [] \n",
" c | [] \n",
"-RECORD 5---------------------\n",
" id | 6 \n",
" a | [50.0 -> 2, 25.0 -> 1] \n",
" b | [4] \n",
" c | [A] \n",
"-RECORD 6---------------------\n",
" id | 7 \n",
" a | [50.0 -> 2, 25.0 -> 1] \n",
" b | [4,,] \n",
" c | [] \n",
"-RECORD 7---------------------\n",
" id | 8 \n",
" a | [50.0 -> 2, 25.0 -> 1] \n",
" b | [4,] \n",
" c | [] \n",
"\n"
]
}
],
"source": [
"# show without truncation\n",
"df.show(truncate=False, vertical=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Access complex type in DataFrame"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can access array with `col('col_name')[n]`, starting with 0, however, `[-1]` does not mean the last element like in python. To access the last element is tricky, I created UDFs below to do so.\n",
"\n",
"You can acccess dict/map with `col('col_name')[key]`, similar to python. Except that, unlike in python, when `key` does not exist in the dict, `null` will be returned instead of `KeyError`."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"@F.udf(returnType=FloatType())\n",
"def getLastNumericalElement(x):\n",
" if x and isinstance(x,list):\n",
" try:\n",
" return float(x[-1]) if x[-1] else None\n",
" except TypeError, e:\n",
" err_msg = \"something wrong with x: {}\".format(x)\n",
" raise TypeError(str(e)+'\\n'+err_msg)\n",
" return None"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"@F.udf(returnType=StringType())\n",
"def getLastStringElement(x):\n",
" if x and isinstance(x,list):\n",
" try:\n",
" return str(x[-1]) if x[-1] else None\n",
" except TypeError, e:\n",
" err_msg = \"something wrong with x: {}\".format(x)\n",
" raise TypeError(str(e)+'\\n'+err_msg)\n",
" return None"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------------------+---------+----------+----+----+----+----+------+----+----+------+----+----+----+\n",
"| id| a| b| c| b_0| b_1| b_7|b_-1|b_last| c_0|c_-1|c_last|a_50|a_25| a_0|\n",
"+---+--------------------+---------+----------+----+----+----+----+------+----+----+------+----+----+----+\n",
"| 1|[50.0 -> 2, 25.0 ...|[1, 2, 3]| [A, B, C]| 1| 2|null|null| 3.0| A|null| C| 2| 1|null|\n",
"| 2|[50.0 -> 2, 25.0 ...| [4, 5,]|[A,, B, C]| 4| 5|null|null| null| A|null| C| 2| 1|null|\n",
"| 3| null| null| null|null|null|null|null| null|null|null| null|null|null|null|\n",
"| 4|[50.0 -> 2, 25.0 ...| []| []|null|null|null|null| null|null|null| null| 2| 1|null|\n",
"| 5|[50.0 -> 2, 25.0 ...| []| []|null|null|null|null| null|null|null| null| 2| 1|null|\n",
"| 6|[50.0 -> 2, 25.0 ...| [4]| [A]| 4|null|null|null| 4.0| A|null| A| 2| 1|null|\n",
"| 7|[50.0 -> 2, 25.0 ...| [4,,]| []| 4|null|null|null| null|null|null| null| 2| 1|null|\n",
"| 8|[50.0 -> 2, 25.0 ...| [4,]| []| 4|null|null|null| null|null|null| null| 2| 1|null|\n",
"+---+--------------------+---------+----------+----+----+----+----+------+----+----+------+----+----+----+\n",
"\n"
]
}
],
"source": [
"df.withColumn(\"b_0\", col(\"b\")[0])\\\n",
" .withColumn(\"b_1\", col(\"b\")[1])\\\n",
" .withColumn(\"b_7\", col(\"b\")[7])\\\n",
" .withColumn(\"b_-1\", col(\"b\")[-1])\\\n",
" .withColumn(\"b_last\", getLastNumericalElement(\"b\"))\\\n",
" .withColumn(\"c_0\", col(\"c\")[0])\\\n",
" .withColumn(\"c_-1\", col(\"c\")[-1])\\\n",
" .withColumn(\"c_last\", getLastStringElement(\"c\"))\\\n",
" .withColumn(\"a_50\", col(\"a\")[50])\\\n",
" .withColumn(\"a_25\", col(\"a\")[25])\\\n",
" .withColumn(\"a_0\", col(\"a\")[0])\\\n",
" .show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Create User-Defined Function (UDF)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Summary:\n",
"\n",
"We show two ways to convert from `your_python_func` to `yourUdfName` using `pyspark.sql.functions.udf`:\n",
"\n",
"`import pyspark.sql.functions as F`\n",
"\n",
"1) Convert your function to UDF using `F.udf`:\n",
"\n",
"```\n",
"def your_python_func(x):\n",
" return your_output_cast_to_YourReturnType\n",
" \n",
"yourUDFName = F.udf(your_python_func, YourReturnType()) \n",
"```\n",
"\n",
"\n",
"2) Using decorator on your function, and your function name will become UDF\n",
"\n",
"```\n",
"@F.udf(returnType=YourReturnType())\n",
"def yourUDFName(x): \n",
" return your_output_cast_to_YourReturnType\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Approach 1: Convert max_in_array to maxInArray using pyspark.sql.functions.udf"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"20.0\n",
"None\n"
]
}
],
"source": [
"### Example 1: Finding max element in an array\n",
"def max_in_array(elements):\n",
" if elements:\n",
" elements = [e for e in elements if (e is not None) and ~np.isnan(e)]\n",
" if len(elements)>0:\n",
" return float(max(elements))\n",
" return None\n",
"\n",
"### test if it works with python array first\n",
"print(max_in_array([9, -1, 20, 3, None, np.nan, 5]))\n",
"# out: 20.0\n",
"\n",
"print(max_in_array(None))\n",
"# out: None"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"### Now let's use it as a UDF in Spark DataFrame\n",
"### Let's name it maxInArray\n",
"maxInArray = F.udf(max_in_array, FloatType())"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------------------+---------+----------+-----+\n",
"| id| a| b| c|max_b|\n",
"+---+--------------------+---------+----------+-----+\n",
"| 1|[50.0 -> 2, 25.0 ...|[1, 2, 3]| [A, B, C]| 3.0|\n",
"| 2|[50.0 -> 2, 25.0 ...| [4, 5,]|[A,, B, C]| 5.0|\n",
"| 3| null| null| null| null|\n",
"| 4|[50.0 -> 2, 25.0 ...| []| []| null|\n",
"| 5|[50.0 -> 2, 25.0 ...| []| []| null|\n",
"| 6|[50.0 -> 2, 25.0 ...| [4]| [A]| 4.0|\n",
"| 7|[50.0 -> 2, 25.0 ...| [4,,]| []| 4.0|\n",
"| 8|[50.0 -> 2, 25.0 ...| [4,]| []| 4.0|\n",
"+---+--------------------+---------+----------+-----+\n",
"\n"
]
}
],
"source": [
"df.withColumn(\"max_b\", maxInArray(\"b\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Approach 2: Convert minInArray to UDF using decorator"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"### Example 2: another way to create UDF by using decorator\n",
"@F.udf(returnType=FloatType())\n",
"def minInArray(elements): \n",
" if elements:\n",
" elements = [e for e in elements if (e is not None) and ~np.isnan(e)]\n",
" if len(elements)>0:\n",
" return float(min(elements))\n",
" return None"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------------------+---------+----------+-----+\n",
"| id| a| b| c|min_b|\n",
"+---+--------------------+---------+----------+-----+\n",
"| 1|[50.0 -> 2, 25.0 ...|[1, 2, 3]| [A, B, C]| 1.0|\n",
"| 2|[50.0 -> 2, 25.0 ...| [4, 5,]|[A,, B, C]| 4.0|\n",
"| 3| null| null| null| null|\n",
"| 4|[50.0 -> 2, 25.0 ...| []| []| null|\n",
"| 5|[50.0 -> 2, 25.0 ...| []| []| null|\n",
"| 6|[50.0 -> 2, 25.0 ...| [4]| [A]| 4.0|\n",
"| 7|[50.0 -> 2, 25.0 ...| [4,,]| []| 4.0|\n",
"| 8|[50.0 -> 2, 25.0 ...| [4,]| []| 4.0|\n",
"+---+--------------------+---------+----------+-----+\n",
"\n"
]
}
],
"source": [
"df.withColumn(\"min_b\", minInArray(\"b\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Using try and except to help with debugging UDF"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Test your function in python first before making it a UDF"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It's always a great idea to test your UDF in a normal python mode before making it spark UDF. Most of the time, the error message from your test and ones from Spark are the same.\n",
"\n",
"In this example, let's build a function to find median of array in spark DataFrame"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def too_simple_median_in_array(x):\n",
" return float(np.median(x))\n",
"\n",
"### Spoiler Alert: Some errors will be raised here!!!\n",
"\n",
"print(too_simple_median_in_array([-1,2,3,None,np.nan,4,5]))\n",
"# out: 3.0\n",
"\n",
"print(too_simple_median_in_array(None))\n",
"# out: TypeError: unsupported operand type(s) for /: 'NoneType' and 'int'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"So, I improve the function a bit to handle `None`"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3.0\n",
"None\n"
]
}
],
"source": [
"def simple_median_in_array(x):\n",
" if x:\n",
" return float(np.median(x))\n",
" return None\n",
"\n",
"print(simple_median_in_array([-1,2,3,None,np.nan,4,5]))\n",
"# out: 3.0\n",
"\n",
"print(simple_median_in_array(None))\n",
"# out: None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Alright, now the new function can take care of `None`. It's time to make it UDF and test in spark DataFrame"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"simpleMedianInArray = F.udf(simple_median_in_array, FloatType())\n",
"\n",
"df.withColumn(\"median_b\", simpleMedianInArray(\"b\")).show()\n",
"# out: TypeError: unsupported operand type(s) for /: 'NoneType' and 'int'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Oops, I got `TypeError`...what's going on? \n",
"The error message doesn't tell me which line or what kind of input triggers this error. \n",
"\n",
"How can we pinpoint the bad record?\n",
"\n",
"It's time to include `try` and `except` in the UDF. \n",
"\n",
"How about I do this..."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def better_median_in_array(x):\n",
" if x:\n",
" try:\n",
" return float(np.median(x))\n",
" except TypeError, e:\n",
" err_msg = \"something wrong with x: {}\".format(x)\n",
" raise TypeError(str(e)+'\\n'+err_msg) \n",
" return None\n",
" \n",
"betterMedianInArray = F.udf(better_median_in_array, FloatType())\n",
"\n",
"df.withColumn(\"median_b\", betterMedianInArray(\"b\")).show()\n",
"# out: TypeError: unsupported operand type(s) for /: 'NoneType' and 'int'\n",
"# something wrong with x: [4, None, None]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now the error message is more informative and we know that `[4, None, None]` causes the problem, so we will fix it in the next iteration:"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------------------+---------+----------+--------+\n",
"| id| a| b| c|median_b|\n",
"+---+--------------------+---------+----------+--------+\n",
"| 1|[50.0 -> 2, 25.0 ...|[1, 2, 3]| [A, B, C]| 2.0|\n",
"| 2|[50.0 -> 2, 25.0 ...| [4, 5,]|[A,, B, C]| 4.5|\n",
"| 3| null| null| null| null|\n",
"| 4|[50.0 -> 2, 25.0 ...| []| []| null|\n",
"| 5|[50.0 -> 2, 25.0 ...| []| []| NaN|\n",
"| 6|[50.0 -> 2, 25.0 ...| [4]| [A]| 4.0|\n",
"| 7|[50.0 -> 2, 25.0 ...| [4,,]| []| 4.0|\n",
"| 8|[50.0 -> 2, 25.0 ...| [4,]| []| 4.0|\n",
"+---+--------------------+---------+----------+--------+\n",
"\n"
]
}
],
"source": [
"def better2_median_in_array(x):\n",
" if x:\n",
" x2 = [e for e in x if e is not None]\n",
" try:\n",
" return float(np.median(x2))\n",
" except TypeError, e:\n",
" err_msg = \"something wrong with x: {}\".format(x)\n",
" raise TypeError(str(e)+'\\n'+err_msg)\n",
" return None\n",
" \n",
"better2MedianInArray = F.udf(better2_median_in_array, FloatType())\n",
"\n",
"df.withColumn(\"median_b\", better2MedianInArray(\"b\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Looks good, no error, but I don't like `NaN` output when `b` is empty array. I want `None` instead. So, let's fix this next:"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------------------+---------+----------+--------+\n",
"| id| a| b| c|median_b|\n",
"+---+--------------------+---------+----------+--------+\n",
"| 1|[50.0 -> 2, 25.0 ...|[1, 2, 3]| [A, B, C]| 2.0|\n",
"| 2|[50.0 -> 2, 25.0 ...| [4, 5,]|[A,, B, C]| 4.5|\n",
"| 3| null| null| null| null|\n",
"| 4|[50.0 -> 2, 25.0 ...| []| []| null|\n",
"| 5|[50.0 -> 2, 25.0 ...| []| []| null|\n",
"| 6|[50.0 -> 2, 25.0 ...| [4]| [A]| 4.0|\n",
"| 7|[50.0 -> 2, 25.0 ...| [4,,]| []| 4.0|\n",
"| 8|[50.0 -> 2, 25.0 ...| [4,]| []| 4.0|\n",
"+---+--------------------+---------+----------+--------+\n",
"\n"
]
}
],
"source": [
"def median_in_array(x):\n",
" if x:\n",
" x2 = [e for e in x if e is not None]\n",
" if len(x2)>0:\n",
" try:\n",
" return float(np.median(x2))\n",
" except TypeError, e:\n",
" err_msg = \"something wrong with x: {}\".format(x)\n",
" raise TypeError(str(e)+'\\n'+err_msg)\n",
" return None\n",
" \n",
"medianInArray = F.udf(median_in_array, FloatType())\n",
"\n",
"df.withColumn(\"median_b\", medianInArray(\"b\")).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Perfect!!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Work with complex types in UDF"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Calculate percentiles of array"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We build function `percentilesOfArray` to calculate percentiles, based on list `a`, of array x."
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"@F.udf(returnType=ArrayType(DoubleType()))\n",
"def percentilesOfArray(x,a=[25,50,75]):\n",
" if x:\n",
" x = [e for e in x if (e is not None) and ~np.isnan(e)]\n",
" if len(x)>0:\n",
" return np.percentile(x,a).tolist()\n",
" return None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It's very important to have `.tolist()` to convert numpy vector to list (array) b/c we promise `returnType=ArrayType(DoubleType())` when declare the UDF."
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+---------+-----------------+\n",
"| id| b| pctl_b|\n",
"+---+---------+-----------------+\n",
"| 1|[1, 2, 3]| [1.5, 2.0, 2.5]|\n",
"| 2| [4, 5,]|[4.25, 4.5, 4.75]|\n",
"| 3| null| null|\n",
"| 4| []| null|\n",
"| 5| []| null|\n",
"| 6| [4]| [4.0, 4.0, 4.0]|\n",
"| 7| [4,,]| [4.0, 4.0, 4.0]|\n",
"| 8| [4,]| [4.0, 4.0, 4.0]|\n",
"+---+---------+-----------------+\n",
"\n"
]
}
],
"source": [
"df.withColumn(\"pctl_b\", percentilesOfArray(\"b\"))\\\n",
" .select(\"id\", \"b\", \"pctl_b\")\\\n",
" .show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To pass array `[45,50,80]` as an argument to the UDF, we need to make it of type `lit`. So, we do\n",
"```\n",
"F.array([lit(45), lit(50), lit(80)])\n",
"```\n",
"otherwise, it gives error complaining about type."
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+---------+----------------+\n",
"| id| b| pctl_b|\n",
"+---+---------+----------------+\n",
"| 1|[1, 2, 3]| [1.9, 2.0, 2.6]|\n",
"| 2| [4, 5,]|[4.45, 4.5, 4.8]|\n",
"| 3| null| null|\n",
"| 4| []| null|\n",
"| 5| []| null|\n",
"| 6| [4]| [4.0, 4.0, 4.0]|\n",
"| 7| [4,,]| [4.0, 4.0, 4.0]|\n",
"| 8| [4,]| [4.0, 4.0, 4.0]|\n",
"+---+---------+----------------+\n",
"\n"
]
}
],
"source": [
"df.withColumn(\"pctl_b\", percentilesOfArray(\"b\", F.array([lit(45), lit(50), lit(80)])))\\\n",
" .select(\"id\", \"b\", \"pctl_b\")\\\n",
" .show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we want to make the percentile more readable by returning Map from percentile (integer) to percentile value (double)."
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"@F.udf(returnType=MapType(IntegerType(),DoubleType()))\n",
"def percentilesMapOfArray(x,a=[25,50,75]):\n",
" if x:\n",
" x = [e for e in x if (e is not None) and ~np.isnan(e)]\n",
" if len(x)>0:\n",
" return dict(zip(a,np.percentile(x,a).tolist()))\n",
" return None"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+---------+----------------------------------+\n",
"|id |b |pctl_b |\n",
"+---+---------+----------------------------------+\n",
"|1 |[1, 2, 3]|[80 -> 2.6, 50 -> 2.0, 45 -> 1.9] |\n",
"|2 |[4, 5,] |[80 -> 4.8, 50 -> 4.5, 45 -> 4.45]|\n",
"|3 |null |null |\n",
"|4 |[] |null |\n",
"|5 |[] |null |\n",
"|6 |[4] |[80 -> 4.0, 50 -> 4.0, 45 -> 4.0] |\n",
"|7 |[4,,] |[80 -> 4.0, 50 -> 4.0, 45 -> 4.0] |\n",
"|8 |[4,] |[80 -> 4.0, 50 -> 4.0, 45 -> 4.0] |\n",
"+---+---------+----------------------------------+\n",
"\n"
]
}
],
"source": [
"df.withColumn(\"pctl_b\", percentilesMapOfArray(\"b\", F.array([lit(45), lit(50), lit(80)])))\\\n",
" .select(\"id\", \"b\", \"pctl_b\")\\\n",
" .show(n=100, truncate=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment