Skip to content

Instantly share code, notes, and snippets.

@mheilman
Last active July 8, 2022 12:53
Show Gist options
  • Save mheilman/6ce261549b55bf4997ec102ad4e8d643 to your computer and use it in GitHub Desktop.
Save mheilman/6ce261549b55bf4997ec102ad4e8d643 to your computer and use it in GitHub Desktop.
example for "Prediction at Scale with scikit-learn and PySpark Pandas UDFs" (https://medium.com/civis-analytics/prediction-at-scale-with-scikit-learn-and-pyspark-pandas-udfs-51d5ebfb2cd8)
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set things up and generate some data"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"python: 3.6.9\n",
"pyspark: 2.4.3\n",
"scikit-learn: 0.21.3\n",
"pandas: 0.21.1\n",
"pyarrow: 0.8.0\n"
]
}
],
"source": [
"import sys\n",
"\n",
"import numpy as np\n",
"import sklearn\n",
"from sklearn.datasets import make_classification\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.ensemble import RandomForestClassifier\n",
"from sklearn.model_selection import GridSearchCV\n",
"import pandas as pd\n",
"import pyarrow\n",
"import pyspark\n",
"import pyspark.sql.functions as F\n",
"from pyspark.sql.types import DoubleType, StringType, ArrayType\n",
"\n",
"# Print the versions of packages, etc.\n",
"print(f\"python: {sys.version.split()[0]}\")\n",
"print(f\"pyspark: {pyspark.__version__}\")\n",
"print(f\"scikit-learn: {sklearn.__version__}\")\n",
"# pyspark versions after 2.4.4 should better support pandas and pyarrow versions (https://github.com/apache/spark/pull/24867)\n",
"print(f\"pandas: {pd.__version__}\")\n",
"# https://github.com/apache/spark/blob/v2.4.3/python/setup.py#L106\n",
"print(f\"pyarrow: {pyarrow.__version__}\")\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Make some fake data and train a model.\n",
"n_samples_test = 100000\n",
"n_samples_train = 1000\n",
"n_samples_all = n_samples_train + n_samples_test\n",
"n_features = 50\n",
"\n",
"X, y = make_classification(n_samples=n_samples_all, n_features=n_features, random_state=123)\n",
"X_train, X_test, y_train, y_test = \\\n",
" train_test_split(X, y, test_size=n_samples_test, random_state=45)\n",
"\n",
"# Use pandas to put the test data in parquet format to illustrate how to load it up later.\n",
"# In real usage, the data might be on S3, Azure Blog Storage, HDFS, etc.\n",
"column_names = [f'feature{i}' for i in range(n_features)]\n",
"(\n",
" pd.DataFrame(X_test, columns=column_names)\n",
" .reset_index()\n",
" .rename(columns={'index': 'id'})\n",
" .to_parquet('unlabeled_data')\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Train a model with scikit-learn"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ROC AUC: 0.959\n"
]
}
],
"source": [
"param_grid = {'n_estimators': [100], 'max_depth': [2, 4, None]}\n",
"gs_rf = GridSearchCV(\n",
" RandomForestClassifier(random_state=42),\n",
" param_grid=param_grid,\n",
" scoring='roc_auc',\n",
" cv=3\n",
").fit(X_train, y_train)\n",
"print('ROC AUC: %.3f' % gs_rf.best_score_)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Set up a spark environment"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"sc = pyspark.SparkContext(appName=\"foo\")\n",
"sqlContext = pyspark.SQLContext(sc)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Now load the data and make predictions.\n",
"\n",
"In real usage, we might be doing a bunch of ETL after reading raw data, but here, we'll just load it up."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"DataFrame[id: bigint, feature0: double, feature1: double, feature2: double, feature3: double, feature4: double, feature5: double, feature6: double, feature7: double, feature8: double, feature9: double, feature10: double, feature11: double, feature12: double, feature13: double, feature14: double, feature15: double, feature16: double, feature17: double, feature18: double, feature19: double, feature20: double, feature21: double, feature22: double, feature23: double, feature24: double, feature25: double, feature26: double, feature27: double, feature28: double, feature29: double, feature30: double, feature31: double, feature32: double, feature33: double, feature34: double, feature35: double, feature36: double, feature37: double, feature38: double, feature39: double, feature40: double, feature41: double, feature42: double, feature43: double, feature44: double, feature45: double, feature46: double, feature47: double, feature48: double, feature49: double, __index_level_0__: bigint]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_unlabeled = sqlContext.read.parquet('unlabeled_data')\n",
"df_unlabeled"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make predictions with a regular UDF\n",
"\n",
"First, we'll try a regular UDF. This will deserialize one row (i.e., instance, sample, record) at a time, make a prediction with the, and return a prediction, which will be serialized and sent back to Spark to combine with all the other predictions."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=0, prediction=0.96),\n",
" Row(id=1, prediction=0.13),\n",
" Row(id=2, prediction=0.95),\n",
" Row(id=3, prediction=0.43),\n",
" Row(id=4, prediction=0.95)]"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@F.udf(returnType=DoubleType())\n",
"def predict_udf(*cols):\n",
" # cols will be a tuple of floats here.\n",
" return float(gs_rf.predict_proba((cols,))[0, 1])\n",
"\n",
"df_pred_a = df_unlabeled.select(\n",
" F.col('id'),\n",
" predict_udf(*column_names).alias('prediction')\n",
")\n",
"df_pred_a.take(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make predictions with a Pandas UDF\n",
"\n",
"Now we'll use a Pandas UDF (i.e., vectorized UDF). In this case, Spark will send a tuple of pandas Series objects with multiple rows at a time. The tuple will have one Series per column/feature, in the order they are passed to the UDF. Note that one of these Series objects won't contain features for all rows at once because Spark partitions datasets across workers. The partition size can be tuned, but we'll just use defaults here."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=0, prediction=0.96),\n",
" Row(id=1, prediction=0.13),\n",
" Row(id=2, prediction=0.95),\n",
" Row(id=3, prediction=0.43),\n",
" Row(id=4, prediction=0.95)]"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@F.pandas_udf(returnType=DoubleType())\n",
"def predict_pandas_udf(*cols):\n",
" # cols will be a tuple of pandas.Series here.\n",
" X = pd.concat(cols, axis=1)\n",
" return pd.Series(gs_rf.predict_proba(X)[:, 1])\n",
"\n",
"df_pred_b = df_unlabeled.select(\n",
" F.col('id'),\n",
" predict_pandas_udf(*column_names).alias('prediction')\n",
")\n",
"df_pred_b.take(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Making multiclass predictions\n",
"\n",
"Above, we're just returning a single series of predictions for the positive class, which works for single binary or dependent variables. One can also put multiclass or multilabel models in Pandas UDFs. One just returns a series of lists of numbers instead of a series of numbers."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[Row(id=0, prediction_0=0.04, prediction_1=0.96),\n",
" Row(id=1, prediction_0=0.87, prediction_1=0.13),\n",
" Row(id=2, prediction_0=0.05, prediction_1=0.95),\n",
" Row(id=3, prediction_0=0.57, prediction_1=0.43),\n",
" Row(id=4, prediction_0=0.05, prediction_1=0.95)]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@F.pandas_udf(returnType=ArrayType(DoubleType()))\n",
"def predict_pandas_udf(*cols):\n",
" X = pd.concat(cols, axis=1)\n",
" return pd.Series(row.tolist() for row in gs_rf.predict_proba(X))\n",
"\n",
"df_pred_multi = (\n",
" df_unlabeled.select(\n",
" F.col('id'),\n",
" predict_pandas_udf(*column_names).alias('predictions')\n",
" )\n",
" # Select each item of the prediction array into its own column.\n",
" .select(\n",
" F.col('id'),\n",
" *[F.col('predictions')[i].alias(f'prediction_{c}')\n",
" for i, c in enumerate(gs_rf.classes_)]\n",
" )\n",
")\n",
"df_pred_multi.take(5)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
@a3agalyan
Copy link

Hi, im getting some kind of pyspark error on your code. I guess it's because of libs incompatibility.
Could you pls provide python, pandas and pyspark versions, which worked for u, thx

@mheilman
Copy link
Author

mheilman commented Dec 2, 2019

Hi, sorry about not including version numbers in there. I added them just now.

I suspect the pandas or pyarrow version was causing trouble because I had to use some older versions of those to get this notebook to run just now. I didn't look into it deeply, but it looks like the latest version of pyspark at the moment (2.4.4) doesn't work well with the latest versions of pandas and pyarrow. This apparently has been addressed for future releases (see, e.g., apache/spark#24867).

@a3agalyan
Copy link

thank you so much for your quick response, it worked for me as i downgraded pyarrow to 0.8.0

@jamesonl
Copy link

jamesonl commented Dec 3, 2019

Hi - this code is helpful for applying an already trained model at scale... but is it possible to train a model at scale using pandas_UDF functions?

Another way of asking the same question: Is it possible to include the section called "Train a model with scikit-learn" within a pandas_UDF?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment