-
-
Save misbah-uddin/4dff0d9a387cb17de0dcc30c1d87370c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Databricks notebook source | |
# MAGIC %md | |
# MAGIC | |
# MAGIC Benchmarking "sk-dist" against mllib and basic sklearn. For more details see [Train sklearn 100x faster](https://medium.com/building-ibotta/train-sklearn-100x-faster-bec530fc1f45) and [Ibotta/sk-dist](https://github.com/Ibotta/sk-dist). | |
# COMMAND ---------- | |
import os | |
from timeit import timeit | |
import numpy as np | |
import sklearn | |
from skdist.distribute.search import DistGridSearchCV | |
from sklearn import datasets | |
from sklearn.metrics import f1_score | |
from sklearn.model_selection import GridSearchCV | |
from sklearn.tree import DecisionTreeClassifier as SklearnDecisionTreeClassifier | |
from pyspark.sql import SparkSession | |
from pyspark.ml import Pipeline | |
from pyspark.ml.evaluation import MulticlassClassificationEvaluator | |
from pyspark.ml.feature import StringIndexer | |
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder | |
from pyspark.ml.classification import DecisionTreeClassifier as PySparkDecisionTreeClassifier | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC #### Parameters | |
# COMMAND ---------- | |
SKLEARN_SCORING = "f1_weighted" | |
NUM_FOLDS = 10 | |
N_JOBS = 10 | |
NUM_TRIALS = 5 | |
RANDOM_STATE = 1234 | |
MAX_DEPTH_LIMIT = 10 | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC ### Helper functions | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC #### Miscellinous helper functions | |
# COMMAND ---------- | |
def get_params(max_depth_limit=MAX_DEPTH_LIMIT): | |
return {"max_depth": [_ for _ in range(2, max_depth_limit+1)]} | |
# COMMAND ---------- | |
def print_stats(model_name, stats, trials=NUM_TRIALS): | |
print (model_name) | |
print(f"Took {stats} seconds for {trials} trials, averaging {stats/trials} seconds.") | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC #### Data read helpers | |
# COMMAND ---------- | |
def get_mnist_data_databricks(is_train_data=True, cache=True): | |
data_path="/databricks-datasets/mnist-digits/data-001/" | |
filename = "mnist-digits-train.txt" if is_train_data else "mnist-digits-test.txt" | |
path = os.path.join(data_path, filename) | |
data = spark.read.format("libsvm").option("numFeatures", "784").load(path) | |
if cache: | |
data.cache() | |
print(f"There are {data.count()} images") | |
return data | |
def get_numpy_array_from_sparse_vector(data): | |
dataset = data.apply(lambda _ : np.array(_.toArray())).values.reshape(-1,1) | |
serialized_data = np.apply_along_axis(lambda _ : _[0], 1, dataset) | |
return serialized_data | |
def get_data_as_numpy_array(data, cache=True): | |
converted_data = data.toPandas() | |
X = get_numpy_array_from_sparse_vector(converted_data["features"]) | |
y = converted_data["label"].to_numpy() | |
return X, y | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC #### Model helper functions | |
# COMMAND ---------- | |
def get_pyspark_validator(num_folds=NUM_FOLDS, params=get_params): | |
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel") | |
classifier = PySparkDecisionTreeClassifier(labelCol="indexedLabel") | |
estimator = Pipeline(stages=[indexer, classifier]) | |
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel") | |
max_depth = params()["max_depth"] | |
grid = ParamGridBuilder().addGrid(classifier.maxDepth, max_depth).build() | |
validator = CrossValidator(estimator=estimator, | |
evaluator=evaluator, | |
estimatorParamMaps=grid, | |
numFolds=num_folds) | |
return validator | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC ### Main flow | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC #### Data | |
# COMMAND ---------- | |
train_data_pyspark = get_mnist_data_databricks() | |
display(train_data_pyspark) | |
# COMMAND ---------- | |
test_data_pyspark = get_mnist_data_databricks(is_train_data=False) | |
display(test_data_pyspark) | |
# COMMAND ---------- | |
X_train, y_train = get_data_as_numpy_array(train_data_pyspark) | |
print (X_train.shape, y_train.shape) | |
# COMMAND ---------- | |
X_test, y_test = get_data_as_numpy_array(test_data_pyspark) | |
print (X_test.shape, y_test.shape) | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC #### Testing PySpark modeling | |
# COMMAND ---------- | |
pyspark_model_name = f"PySpark mllib" | |
validator = get_pyspark_validator() | |
pyspark_model = validator.fit(train_data_pyspark) | |
pyspark_model_train_score = max(pyspark_model.avgMetrics) | |
pyspark_evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="weightedPrecision") | |
pyspark_model_test_score = pyspark_evaluator.evaluate(pyspark_model.transform(test_data_pyspark)) | |
print (f"{pyspark_model_name}: train_score {pyspark_model_train_score}, test_score {pyspark_model_test_score}") | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC #### Testing basic sklearn modeling | |
# COMMAND ---------- | |
basic_model_name = f"Basic sklearn, multithreaded {N_JOBS}" | |
basic_model = GridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE), | |
param_grid=get_params(), | |
cv=NUM_FOLDS, | |
scoring=SKLEARN_SCORING, | |
n_jobs=N_JOBS) | |
basic_model.fit(X=X_train, y=y_train) | |
basic_model_train_score = basic_model.best_score_ | |
basic_model_test_score = f1_score(basic_model.predict(X_test), y_test, average="micro") | |
print (f"{basic_model_name}: train_score {basic_model_train_score}, test_score {basic_model_test_score}") | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC #### Testing sk-dist modeling | |
# COMMAND ---------- | |
sk_dist_model_name = f"sk-dist model" | |
sk_dist_model = DistGridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE), | |
param_grid=get_params(), | |
cv=NUM_FOLDS, | |
scoring=SKLEARN_SCORING, | |
sc=spark.sparkContext) | |
sk_dist_model.fit(X=X_train, y=y_train) | |
sk_dist_model_train_score = sk_dist_model.best_score_ | |
sk_dist_model_test_score = f1_score(sk_dist_model.predict(X_test), y_test, average="micro") | |
print (f"{sk_dist_model_name}: train_score {sk_dist_model_train_score}, test_score {sk_dist_model_test_score}") | |
# COMMAND ---------- | |
# MAGIC %md | |
# MAGIC | |
# MAGIC #### Testing run times | |
# COMMAND ---------- | |
def run_sk_dist_model(X=X_train, y=y_train): | |
DistGridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE), | |
param_grid=get_params(), | |
cv=NUM_FOLDS, | |
scoring=SKLEARN_SCORING, | |
sc=spark.sparkContext).fit(X, y) | |
def run_basic_model(X=X_train, y=y_train): | |
GridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE), | |
param_grid=get_params(), | |
cv=NUM_FOLDS, | |
scoring=SKLEARN_SCORING, | |
n_jobs=N_JOBS).fit(X, y) | |
def run_pyspark_model(data=train_data_pyspark): | |
get_pyspark_validator().fit(data) | |
runs = { | |
sk_dist_model_name: run_sk_dist_model, | |
basic_model_name: run_basic_model, | |
pyspark_model_name: run_pyspark_model | |
} | |
results = [] | |
for model_name, func in runs.items(): | |
stats = timeit(func, number=NUM_TRIALS) | |
results.append({"model_name": model_name, "stats": stats}) | |
# COMMAND ---------- | |
for _ in results: | |
print_stats(model_name=_["model_name"], stats=_["stats"]) | |
print() | |
# COMMAND ---------- | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment