Created
July 8, 2019 08:20
-
-
Save drorata/6f96fccd7735daa4179fa5099c739aaa to your computer and use it in GitHub Desktop.
Comparing pandas_udf to create_map
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Vectorized mappings of PySpark columns" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## How to run?\n", | |
"\n", | |
"Easiest would be to use the [PySpark docker](https://hub.docker.com/r/jupyter/pyspark-notebook/).\n", | |
"First, place this notebook in `/some/local/dir/`.\n", | |
"Next, simply run:\n", | |
"\n", | |
"```bash\n", | |
"docker run --rm -it -p <SOME_PORT>:8888 -v /some/local/dir/:/home/jovyan/work/ jupyter/pyspark-notebook\n", | |
"```\n", | |
"\n", | |
"Where `SOME_PORT` is the one you want to use on the host machine.\n", | |
"Lastly, copy the the token from the console and provide it in `localhost:<SOME_PORT>`." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Initialize the notebook" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import numpy as np\n", | |
"import pandas as pd\n", | |
"\n", | |
"from itertools import chain\n", | |
"from pyspark.sql import functions as F\n", | |
"from pyspark.sql import SparkSession" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"spark = SparkSession \\\n", | |
" .builder \\\n", | |
" .appName(\"Pysparkexample\") \\\n", | |
" .config(\"spark.some.config.option\", \"some-value\") \\\n", | |
" .getOrCreate()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"sc = spark.sparkContext" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Dummy data to be used" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----+-----+\n", | |
"|feat1|feat2|\n", | |
"+-----+-----+\n", | |
"| MID|SMALL|\n", | |
"| MID|LARGE|\n", | |
"| MID|LARGE|\n", | |
"+-----+-----+\n", | |
"only showing top 3 rows\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"def fresh_df(N=100000, seed=None):\n", | |
" np.random.seed(seed)\n", | |
" feat1 = np.random.choice([\"HI\", \"LO\", \"MID\"], size=N)\n", | |
" feat2 = np.random.choice([\"SMALL\", \"MEDIUM\", \"LARGE\"], size=N)\n", | |
"\n", | |
" pdf = pd.DataFrame({\n", | |
" \"feat1\": feat1,\n", | |
" \"feat2\": feat2\n", | |
" })\n", | |
" return spark.createDataFrame(pdf)\n", | |
"\n", | |
"fresh_df(N=100).show(3)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Objective\n", | |
"\n", | |
"Assume you have categorical and ordinal data, as given in the dummy data above.\n", | |
"Now, as a preprocessing phase, you want to encode the strings as integers.\n", | |
"This is rather straightforward in the case of a categorical data.\n", | |
"It becomes tricky when the data is ordinal; like in the case of `feat1`.\n", | |
"To that end, a manual mapping has to be defined.\n", | |
"\n", | |
"## The mappings to be applied\n", | |
"Next, define the mappings as follows:\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"feat1_dict = {\"HI\": 1, \"MID\": 2, \"LO\": 3}\n", | |
"feat2_dict = {\"SMALL\": 0, \"MEDIUM\": 1, \"LARGE\": 2}" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Naive approach\n", | |
"\n", | |
"For each column in the data, we have a key in `mappings`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"mappings = {\n", | |
" \"feat1\": F.create_map([F.lit(x) for x in chain(*feat1_dict.items())]),\n", | |
" \"feat2\": F.create_map([F.lit(x) for x in chain(*feat2_dict.items())])\n", | |
"}" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Using the created maps, and by iterating over the columns, we can map the strings to integers." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----+------+------------+------------+\n", | |
"|feat1| feat2|feat1_mapped|feat2_mapped|\n", | |
"+-----+------+------------+------------+\n", | |
"| MID|MEDIUM| 2| 1|\n", | |
"| MID| SMALL| 2| 0|\n", | |
"| LO| LARGE| 3| 2|\n", | |
"| LO| SMALL| 3| 0|\n", | |
"| MID| LARGE| 2| 2|\n", | |
"| LO|MEDIUM| 3| 1|\n", | |
"| MID| LARGE| 2| 2|\n", | |
"| LO|MEDIUM| 3| 1|\n", | |
"| MID| SMALL| 2| 0|\n", | |
"| MID| SMALL| 2| 0|\n", | |
"+-----+------+------------+------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df = fresh_df(N=10)\n", | |
"for col in df.columns:\n", | |
" col_map = mappings[col]\n", | |
" df = df.withColumn(col+\"_mapped\", col_map[df[col]])\n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Vectorized approach\n", | |
"\n", | |
"### Modify the strings\n", | |
"\n", | |
"This is based on [this blog post from databricks](https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----+------+-------+\n", | |
"|feat1| feat2| papply|\n", | |
"+-----+------+-------+\n", | |
"| LO| SMALL| LO_foo|\n", | |
"| MID| SMALL|MID_foo|\n", | |
"| HI| LARGE| HI_foo|\n", | |
"| HI| LARGE| HI_foo|\n", | |
"| HI|MEDIUM| HI_foo|\n", | |
"| MID|MEDIUM|MID_foo|\n", | |
"| HI| SMALL| HI_foo|\n", | |
"| HI|MEDIUM| HI_foo|\n", | |
"| LO| LARGE| LO_foo|\n", | |
"| MID| SMALL|MID_foo|\n", | |
"+-----+------+-------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df = fresh_df(N=10)\n", | |
"\n", | |
"@F.pandas_udf('string', F.PandasUDFType.SCALAR)\n", | |
"def add_foo_as_string(col):\n", | |
" return col.apply(str) + \"_foo\"\n", | |
"\n", | |
"df.withColumn('papply', add_foo_as_string(df.feat1)).show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Now, as the object we have inside the udf is `pandas.core.series.Series`, we can enjoy vector mappings!" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----+------+------+\n", | |
"|feat1| feat2|papply|\n", | |
"+-----+------+------+\n", | |
"| HI| LARGE| 1|\n", | |
"| HI| SMALL| 1|\n", | |
"| LO|MEDIUM| 3|\n", | |
"| MID| LARGE| 2|\n", | |
"| LO|MEDIUM| 3|\n", | |
"| MID|MEDIUM| 2|\n", | |
"| HI| LARGE| 1|\n", | |
"| LO|MEDIUM| 3|\n", | |
"| LO| LARGE| 3|\n", | |
"| LO| LARGE| 3|\n", | |
"+-----+------+------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df = fresh_df(N=10)\n", | |
"\n", | |
"@F.pandas_udf('integer', F.PandasUDFType.SCALAR)\n", | |
"def map_map(col):\n", | |
" return col.map(feat1_dict)\n", | |
"\n", | |
"df.withColumn('papply', map_map(df['feat1'])).show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"The problem with the last approach is that the mapping applied is always the same one; it is hard-coded into the definition of `map_map`." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Column-aware applying of a `pandas_udf`" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"feats_dict = {\n", | |
" \"feat1\": feat1_dict,\n", | |
" \"feat2\": feat2_dict\n", | |
"}\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"+-----+------+------------+------------+\n", | |
"|feat1| feat2|feat1_mapped|feat2_mapped|\n", | |
"+-----+------+------------+------------+\n", | |
"| LO| LARGE| 3| 2|\n", | |
"| MID| SMALL| 2| 0|\n", | |
"| HI| SMALL| 1| 0|\n", | |
"| LO|MEDIUM| 3| 1|\n", | |
"| MID|MEDIUM| 2| 1|\n", | |
"| LO|MEDIUM| 3| 1|\n", | |
"| HI| SMALL| 1| 0|\n", | |
"| LO|MEDIUM| 3| 1|\n", | |
"| MID| LARGE| 2| 2|\n", | |
"| MID| LARGE| 2| 2|\n", | |
"+-----+------+------------+------------+\n", | |
"\n" | |
] | |
} | |
], | |
"source": [ | |
"df = fresh_df(N=10)\n", | |
"for col_name in df.columns:\n", | |
" @F.pandas_udf('integer', F.PandasUDFType.SCALAR)\n", | |
" def map_map(col):\n", | |
" return col.map(feats_dict[col_name])\n", | |
" df = df.withColumn(col_name + \"_mapped\", map_map(df[col_name]))\n", | |
" \n", | |
"df.show()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Compare running times" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"2.88 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"df = fresh_df(N=100000)\n", | |
"for col in df.columns:\n", | |
" col_map = mappings[col]\n", | |
" df = df.withColumn(col+\"_mapped\", col_map[df[col]])" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"2.89 s ± 38.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n" | |
] | |
} | |
], | |
"source": [ | |
"%%timeit\n", | |
"df = fresh_df(N=100000)\n", | |
"for col_name in df.columns:\n", | |
" @F.pandas_udf('integer', F.PandasUDFType.SCALAR)\n", | |
" def map_map(col):\n", | |
" return col.map(feats_dict[col_name])\n", | |
" df = df.withColumn(col_name + \"_mapped\", map_map(df[col_name]))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"I was under the impression that the vectorized approach using `pandas_udf` would be much faster than the `create_map`.\n", | |
"However, I'm afraid this is not the case. Why? I don't know, maybe you have an idea?" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.3" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment