Skip to content

Instantly share code, notes, and snippets.

@drorata
Created July 8, 2019 08:20
Show Gist options
  • Save drorata/6f96fccd7735daa4179fa5099c739aaa to your computer and use it in GitHub Desktop.
Save drorata/6f96fccd7735daa4179fa5099c739aaa to your computer and use it in GitHub Desktop.
Comparing pandas_udf to create_map
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Vectorized mappings of PySpark columns"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## How to run?\n",
"\n",
"Easiest would be to use the [PySpark docker](https://hub.docker.com/r/jupyter/pyspark-notebook/).\n",
"First, place this notebook in `/some/local/dir/`.\n",
"Next, simply run:\n",
"\n",
"```bash\n",
"docker run --rm -it -p <SOME_PORT>:8888 -v /some/local/dir/:/home/jovyan/work/ jupyter/pyspark-notebook\n",
"```\n",
"\n",
"Where `SOME_PORT` is the one you want to use on the host machine.\n",
"Lastly, copy the the token from the console and provide it in `localhost:<SOME_PORT>`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initialize the notebook"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"from itertools import chain\n",
"from pyspark.sql import functions as F\n",
"from pyspark.sql import SparkSession"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"spark = SparkSession \\\n",
" .builder \\\n",
" .appName(\"Pysparkexample\") \\\n",
" .config(\"spark.some.config.option\", \"some-value\") \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"sc = spark.sparkContext"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Dummy data to be used"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+-----+\n",
"|feat1|feat2|\n",
"+-----+-----+\n",
"| MID|SMALL|\n",
"| MID|LARGE|\n",
"| MID|LARGE|\n",
"+-----+-----+\n",
"only showing top 3 rows\n",
"\n"
]
}
],
"source": [
"def fresh_df(N=100000, seed=None):\n",
" np.random.seed(seed)\n",
" feat1 = np.random.choice([\"HI\", \"LO\", \"MID\"], size=N)\n",
" feat2 = np.random.choice([\"SMALL\", \"MEDIUM\", \"LARGE\"], size=N)\n",
"\n",
" pdf = pd.DataFrame({\n",
" \"feat1\": feat1,\n",
" \"feat2\": feat2\n",
" })\n",
" return spark.createDataFrame(pdf)\n",
"\n",
"fresh_df(N=100).show(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Objective\n",
"\n",
"Assume you have categorical and ordinal data, as given in the dummy data above.\n",
"Now, as a preprocessing phase, you want to encode the strings as integers.\n",
"This is rather straightforward in the case of a categorical data.\n",
"It becomes tricky when the data is ordinal; like in the case of `feat1`.\n",
"To that end, a manual mapping has to be defined.\n",
"\n",
"## The mappings to be applied\n",
"Next, define the mappings as follows:\n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"feat1_dict = {\"HI\": 1, \"MID\": 2, \"LO\": 3}\n",
"feat2_dict = {\"SMALL\": 0, \"MEDIUM\": 1, \"LARGE\": 2}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Naive approach\n",
"\n",
"For each column in the data, we have a key in `mappings`."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"mappings = {\n",
" \"feat1\": F.create_map([F.lit(x) for x in chain(*feat1_dict.items())]),\n",
" \"feat2\": F.create_map([F.lit(x) for x in chain(*feat2_dict.items())])\n",
"}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using the created maps, and by iterating over the columns, we can map the strings to integers."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+------+------------+------------+\n",
"|feat1| feat2|feat1_mapped|feat2_mapped|\n",
"+-----+------+------------+------------+\n",
"| MID|MEDIUM| 2| 1|\n",
"| MID| SMALL| 2| 0|\n",
"| LO| LARGE| 3| 2|\n",
"| LO| SMALL| 3| 0|\n",
"| MID| LARGE| 2| 2|\n",
"| LO|MEDIUM| 3| 1|\n",
"| MID| LARGE| 2| 2|\n",
"| LO|MEDIUM| 3| 1|\n",
"| MID| SMALL| 2| 0|\n",
"| MID| SMALL| 2| 0|\n",
"+-----+------+------------+------------+\n",
"\n"
]
}
],
"source": [
"df = fresh_df(N=10)\n",
"for col in df.columns:\n",
" col_map = mappings[col]\n",
" df = df.withColumn(col+\"_mapped\", col_map[df[col]])\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Vectorized approach\n",
"\n",
"### Modify the strings\n",
"\n",
"This is based on [this blog post from databricks](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+------+-------+\n",
"|feat1| feat2| papply|\n",
"+-----+------+-------+\n",
"| LO| SMALL| LO_foo|\n",
"| MID| SMALL|MID_foo|\n",
"| HI| LARGE| HI_foo|\n",
"| HI| LARGE| HI_foo|\n",
"| HI|MEDIUM| HI_foo|\n",
"| MID|MEDIUM|MID_foo|\n",
"| HI| SMALL| HI_foo|\n",
"| HI|MEDIUM| HI_foo|\n",
"| LO| LARGE| LO_foo|\n",
"| MID| SMALL|MID_foo|\n",
"+-----+------+-------+\n",
"\n"
]
}
],
"source": [
"df = fresh_df(N=10)\n",
"\n",
"@F.pandas_udf('string', F.PandasUDFType.SCALAR)\n",
"def add_foo_as_string(col):\n",
" return col.apply(str) + \"_foo\"\n",
"\n",
"df.withColumn('papply', add_foo_as_string(df.feat1)).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, as the object we have inside the udf is `pandas.core.series.Series`, we can enjoy vector mappings!"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+------+------+\n",
"|feat1| feat2|papply|\n",
"+-----+------+------+\n",
"| HI| LARGE| 1|\n",
"| HI| SMALL| 1|\n",
"| LO|MEDIUM| 3|\n",
"| MID| LARGE| 2|\n",
"| LO|MEDIUM| 3|\n",
"| MID|MEDIUM| 2|\n",
"| HI| LARGE| 1|\n",
"| LO|MEDIUM| 3|\n",
"| LO| LARGE| 3|\n",
"| LO| LARGE| 3|\n",
"+-----+------+------+\n",
"\n"
]
}
],
"source": [
"df = fresh_df(N=10)\n",
"\n",
"@F.pandas_udf('integer', F.PandasUDFType.SCALAR)\n",
"def map_map(col):\n",
" return col.map(feat1_dict)\n",
"\n",
"df.withColumn('papply', map_map(df['feat1'])).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The problem with the last approach is that the mapping applied is always the same one; it is hard-coded into the definition of `map_map`."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Column-aware applying of a `pandas_udf`"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"feats_dict = {\n",
" \"feat1\": feat1_dict,\n",
" \"feat2\": feat2_dict\n",
"}\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+------+------------+------------+\n",
"|feat1| feat2|feat1_mapped|feat2_mapped|\n",
"+-----+------+------------+------------+\n",
"| LO| LARGE| 3| 2|\n",
"| MID| SMALL| 2| 0|\n",
"| HI| SMALL| 1| 0|\n",
"| LO|MEDIUM| 3| 1|\n",
"| MID|MEDIUM| 2| 1|\n",
"| LO|MEDIUM| 3| 1|\n",
"| HI| SMALL| 1| 0|\n",
"| LO|MEDIUM| 3| 1|\n",
"| MID| LARGE| 2| 2|\n",
"| MID| LARGE| 2| 2|\n",
"+-----+------+------------+------------+\n",
"\n"
]
}
],
"source": [
"df = fresh_df(N=10)\n",
"for col_name in df.columns:\n",
" @F.pandas_udf('integer', F.PandasUDFType.SCALAR)\n",
" def map_map(col):\n",
" return col.map(feats_dict[col_name])\n",
" df = df.withColumn(col_name + \"_mapped\", map_map(df[col_name]))\n",
" \n",
"df.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Compare running times"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.88 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"df = fresh_df(N=100000)\n",
"for col in df.columns:\n",
" col_map = mappings[col]\n",
" df = df.withColumn(col+\"_mapped\", col_map[df[col]])"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2.89 s ± 38.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"df = fresh_df(N=100000)\n",
"for col_name in df.columns:\n",
" @F.pandas_udf('integer', F.PandasUDFType.SCALAR)\n",
" def map_map(col):\n",
" return col.map(feats_dict[col_name])\n",
" df = df.withColumn(col_name + \"_mapped\", map_map(df[col_name]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"I was under the impression that the vectorized approach using `pandas_udf` would be much faster than the `create_map`.\n",
"However, I'm afraid this is not the case. Why? I don't know, maybe you have an idea?"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment