Created
September 5, 2023 17:03
-
-
Save dineshdharme/bb959d199085e9e2590083f6d33f6ff5 to your computer and use it in GitHub Desktop.
Optimized implementation of multiple model inferencing on a dataframe of value using mapPartition function to achieve batch inference
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This need not be so complicated. Here's an concrete example of how you can do this. | |
First load all the models in a list. Broadcast the list. Access the list broadcasted variable's value using `value`. You can concatenate your features into an array as I have done below then do inference on samples one by one. | |
You could achieve the batch semantics by using mapPartition function on an rdd and then convert the result back to dataframe as shown below. | |
import sys | |
from pyspark import SparkContext, SQLContext | |
import joblib | |
from pyspark.sql.functions import pandas_udf | |
import pandas as pd | |
from pyspark.sql import functions as F | |
from sklearn.datasets import load_iris | |
from keras.layers import Dense | |
from keras.models import Sequential | |
import numpy as np | |
sc = SparkContext('local') | |
sqlContext = SQLContext(sc) | |
## Get the data_iris.csv from this location : https://github.com/dehaoterryzhang/Iris_Classification/tree/master/data | |
input_file = "../data/data_iris.csv" | |
initial_df = sqlContext.read.option("inferSchema", "true").csv(input_file, header=True) | |
initial_df.show(n=10, truncate=False) | |
X_train_df = initial_df.select(['sepal_length', 'sepal_width', 'petal_length', 'petal_width']) | |
y_train_df = initial_df.select("species") | |
print("X_train_df features info") | |
X_train_df.show(n=3, truncate=False) | |
print("y_train_df label info") | |
y_train_df.show(n=3, truncate=False) | |
print("distinct classes in the dataset") | |
y_train_df.distinct().show(n=100, truncate=False) | |
all_columns = X_train_df.columns | |
X_train_df = X_train_df.withColumn("features_concat", F.array(all_columns)) | |
all_columns_afterarray = X_train_df.columns | |
X, y = load_iris(return_X_y=True) | |
model = Sequential([ | |
Dense(16, activation='relu'), | |
Dense(32, activation='relu'), | |
Dense(1)]) | |
model.compile(loss='mean_absolute_error', optimizer='adam') | |
model.fit(X, y, epochs=10, verbose=0) | |
keras_model_export_path = '../model_exported/keras_nn_iris_model.pkl' | |
## Uncomment below to export the trained model | |
# joblib.dump(model, keras_model_export_path, compress=9) | |
keras_model_loaded_from_path = joblib.load(keras_model_export_path) | |
model_clone_list = [ keras_model_loaded_from_path] * 10 ## just to simulate list of 10 models, the models are just copies one model | |
## Broadcasting the models so that they are available at the executor | |
broadcasted_model_clone_list = sc.broadcast(model_clone_list) | |
def different_inference(features_array): | |
X = pd.DataFrame([features_array]) | |
X.columns = all_columns | |
prediction_classification_scores_list = [] | |
# inference over all 10 models in a for loop | |
for model_ii in broadcasted_model_clone_list.value: | |
curr_result = model_ii.predict(X) | |
prediction_classification_scores_list.append(curr_result[:, 0]) | |
prob_scores = np.array(prediction_classification_scores_list) | |
final_result = pd.Series(prob_scores.mean(axis=0)).tolist() | |
return final_result | |
different_inference_udf = F.udf(different_inference) | |
### Inferencing over only 10 rows | |
#predicted_df = X_train_df.limit(10).withColumn("prediction_scores", different_inference_udf(F.col("features_concat"))) | |
print("Inference result from the 10 models") | |
#predicted_df.show(n=10, truncate=False) | |
def mapPartition_inference(partitioned_rows): | |
features_array_list = [] | |
for row in partitioned_rows: | |
features_array_list.append(row.features_concat) | |
X = pd.DataFrame(features_array_list) | |
X.columns = all_columns | |
prediction_classification_scores_list = [] | |
# inference over all 10 models in a for loop | |
for model_ii in broadcasted_model_clone_list.value: | |
curr_result = model_ii.predict(X) | |
prediction_classification_scores_list.append(curr_result[:, 0]) | |
prob_scores = np.array(prediction_classification_scores_list) | |
final_result = pd.Series(prob_scores.mean(axis=0)).tolist() | |
print("Hooray are we here!!!!!!!!!!!!!") | |
constructed_result = [] | |
for jj in range(len(features_array_list)): | |
constructed_result.append([features_array_list[jj], final_result[jj]]) | |
return iter(constructed_result) | |
partitioned_df = X_train_df.limit(30).repartition(10) | |
partition_predicted_df = partitioned_df.rdd.mapPartitions(mapPartition_inference).toDF(["features_concat", "prediction_avg_scores"]) | |
print("Inference result from the 10 models using mapPartitions (optimized version)") | |
partition_predicted_df.show(n=30, truncate=False) | |
Output is as follows : | |
+------------+-----------+------------+-----------+-------+ | |
|sepal_length|sepal_width|petal_length|petal_width|species| | |
+------------+-----------+------------+-----------+-------+ | |
|5.1 |3.5 |1.4 |0.2 |setosa | | |
|4.9 |3.0 |1.4 |0.2 |setosa | | |
|4.7 |3.2 |1.3 |0.2 |setosa | | |
|4.6 |3.1 |1.5 |0.2 |setosa | | |
|5.0 |3.6 |1.4 |0.2 |setosa | | |
|5.4 |3.9 |1.7 |0.4 |setosa | | |
|4.6 |3.4 |1.4 |0.3 |setosa | | |
|5.0 |3.4 |1.5 |0.2 |setosa | | |
|4.4 |2.9 |1.4 |0.2 |setosa | | |
|4.9 |3.1 |1.5 |0.1 |setosa | | |
+------------+-----------+------------+-----------+-------+ | |
only showing top 10 rows | |
X_train_df features info | |
+------------+-----------+------------+-----------+ | |
|sepal_length|sepal_width|petal_length|petal_width| | |
+------------+-----------+------------+-----------+ | |
|5.1 |3.5 |1.4 |0.2 | | |
|4.9 |3.0 |1.4 |0.2 | | |
|4.7 |3.2 |1.3 |0.2 | | |
+------------+-----------+------------+-----------+ | |
only showing top 3 rows | |
y_train_df label info | |
+-------+ | |
|species| | |
+-------+ | |
|setosa | | |
|setosa | | |
|setosa | | |
+-------+ | |
only showing top 3 rows | |
distinct classes in the dataset | |
+----------+ | |
|species | | |
+----------+ | |
|virginica | | |
|versicolor| | |
|setosa | | |
+----------+ | |
Inference result from the 10 models using mapPartitions (optimized version) | |
+--------------------+---------------------+ | |
|features_concat |prediction_avg_scores| | |
+--------------------+---------------------+ | |
|[5.2, 3.4, 1.4, 0.2]|0.6321244239807129 | | |
|[5.1, 3.8, 1.5, 0.3]|0.6290395855903625 | | |
|[4.3, 3.0, 1.1, 0.1]|0.5323924422264099 | | |
|[4.8, 3.4, 1.6, 0.2]|0.6239675879478455 | | |
|[4.9, 3.1, 1.5, 0.1]|0.6233397722244263 | | |
|[5.1, 3.5, 1.4, 0.3]|0.625523030757904 | | |
|[4.4, 2.9, 1.4, 0.2]|0.5821532607078552 | | |
|[5.0, 3.4, 1.5, 0.2]|0.627651035785675 | | |
|[4.6, 3.4, 1.4, 0.3]|0.5886028409004211 | | |
|[4.8, 3.0, 1.4, 0.1]|0.6058898568153381 | | |
|[5.4, 3.9, 1.3, 0.4]|0.6334523558616638 | | |
|[5.1, 3.5, 1.4, 0.2]|0.6212030053138733 | | |
|[4.6, 3.6, 1.0, 0.2]|0.5346620678901672 | | |
|[4.6, 3.1, 1.5, 0.2]|0.6045092344284058 | | |
|[4.8, 3.4, 1.9, 0.2]|0.6606692671775818 | | |
|[5.0, 3.4, 1.6, 0.4]|0.6499260663986206 | | |
|[4.9, 3.0, 1.4, 0.2]|0.618742823600769 | | |
|[5.1, 3.7, 1.5, 0.4]|0.6369576454162598 | | |
|[5.4, 3.7, 1.5, 0.2]|0.6512832045555115 | | |
|[5.2, 3.5, 1.5, 0.2]|0.640670657157898 | | |
|[5.8, 4.0, 1.2, 0.2]|0.6450895667076111 | | |
|[5.7, 4.4, 1.5, 0.4]|0.6659770607948303 | | |
|[5.0, 3.6, 1.4, 0.2]|0.6101197004318237 | | |
|[4.7, 3.2, 1.3, 0.2]|0.5853733420372009 | | |
|[5.0, 3.0, 1.6, 0.2]|0.6513381004333496 | | |
|[5.4, 3.9, 1.7, 0.4]|0.679506242275238 | | |
|[4.7, 3.2, 1.6, 0.2]|0.6218041181564331 | | |
|[5.7, 3.8, 1.7, 0.3]|0.7012595534324646 | | |
|[5.4, 3.4, 1.7, 0.2]|0.6839534640312195 | | |
|[5.1, 3.3, 1.7, 0.5]|0.6780367493629456 | | |
+--------------------+---------------------+ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment