Skip to content

Instantly share code, notes, and snippets.

@misbah-uddin
Created January 2, 2021 01:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save misbah-uddin/4dff0d9a387cb17de0dcc30c1d87370c to your computer and use it in GitHub Desktop.
Save misbah-uddin/4dff0d9a387cb17de0dcc30c1d87370c to your computer and use it in GitHub Desktop.
# Databricks notebook source
# MAGIC %md
# MAGIC
# MAGIC Benchmarking "sk-dist" against mllib and basic sklearn. For more details see [Train sklearn 100x faster](https://medium.com/building-ibotta/train-sklearn-100x-faster-bec530fc1f45) and [Ibotta/sk-dist](https://github.com/Ibotta/sk-dist).
# COMMAND ----------
import os
from timeit import timeit
import numpy as np
import sklearn
from skdist.distribute.search import DistGridSearchCV
from sklearn import datasets
from sklearn.metrics import f1_score
from sklearn.model_selection import GridSearchCV
from sklearn.tree import DecisionTreeClassifier as SklearnDecisionTreeClassifier
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.classification import DecisionTreeClassifier as PySparkDecisionTreeClassifier
# COMMAND ----------
# MAGIC %md
# MAGIC #### Parameters
# COMMAND ----------
SKLEARN_SCORING = "f1_weighted"
NUM_FOLDS = 10
N_JOBS = 10
NUM_TRIALS = 5
RANDOM_STATE = 1234
MAX_DEPTH_LIMIT = 10
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ### Helper functions
# COMMAND ----------
# MAGIC %md
# MAGIC #### Miscellinous helper functions
# COMMAND ----------
def get_params(max_depth_limit=MAX_DEPTH_LIMIT):
return {"max_depth": [_ for _ in range(2, max_depth_limit+1)]}
# COMMAND ----------
def print_stats(model_name, stats, trials=NUM_TRIALS):
print (model_name)
print(f"Took {stats} seconds for {trials} trials, averaging {stats/trials} seconds.")
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC #### Data read helpers
# COMMAND ----------
def get_mnist_data_databricks(is_train_data=True, cache=True):
data_path="/databricks-datasets/mnist-digits/data-001/"
filename = "mnist-digits-train.txt" if is_train_data else "mnist-digits-test.txt"
path = os.path.join(data_path, filename)
data = spark.read.format("libsvm").option("numFeatures", "784").load(path)
if cache:
data.cache()
print(f"There are {data.count()} images")
return data
def get_numpy_array_from_sparse_vector(data):
dataset = data.apply(lambda _ : np.array(_.toArray())).values.reshape(-1,1)
serialized_data = np.apply_along_axis(lambda _ : _[0], 1, dataset)
return serialized_data
def get_data_as_numpy_array(data, cache=True):
converted_data = data.toPandas()
X = get_numpy_array_from_sparse_vector(converted_data["features"])
y = converted_data["label"].to_numpy()
return X, y
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC #### Model helper functions
# COMMAND ----------
def get_pyspark_validator(num_folds=NUM_FOLDS, params=get_params):
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
classifier = PySparkDecisionTreeClassifier(labelCol="indexedLabel")
estimator = Pipeline(stages=[indexer, classifier])
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel")
max_depth = params()["max_depth"]
grid = ParamGridBuilder().addGrid(classifier.maxDepth, max_depth).build()
validator = CrossValidator(estimator=estimator,
evaluator=evaluator,
estimatorParamMaps=grid,
numFolds=num_folds)
return validator
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ### Main flow
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC #### Data
# COMMAND ----------
train_data_pyspark = get_mnist_data_databricks()
display(train_data_pyspark)
# COMMAND ----------
test_data_pyspark = get_mnist_data_databricks(is_train_data=False)
display(test_data_pyspark)
# COMMAND ----------
X_train, y_train = get_data_as_numpy_array(train_data_pyspark)
print (X_train.shape, y_train.shape)
# COMMAND ----------
X_test, y_test = get_data_as_numpy_array(test_data_pyspark)
print (X_test.shape, y_test.shape)
# COMMAND ----------
# MAGIC %md
# MAGIC #### Testing PySpark modeling
# COMMAND ----------
pyspark_model_name = f"PySpark mllib"
validator = get_pyspark_validator()
pyspark_model = validator.fit(train_data_pyspark)
pyspark_model_train_score = max(pyspark_model.avgMetrics)
pyspark_evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="weightedPrecision")
pyspark_model_test_score = pyspark_evaluator.evaluate(pyspark_model.transform(test_data_pyspark))
print (f"{pyspark_model_name}: train_score {pyspark_model_train_score}, test_score {pyspark_model_test_score}")
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC #### Testing basic sklearn modeling
# COMMAND ----------
basic_model_name = f"Basic sklearn, multithreaded {N_JOBS}"
basic_model = GridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE),
param_grid=get_params(),
cv=NUM_FOLDS,
scoring=SKLEARN_SCORING,
n_jobs=N_JOBS)
basic_model.fit(X=X_train, y=y_train)
basic_model_train_score = basic_model.best_score_
basic_model_test_score = f1_score(basic_model.predict(X_test), y_test, average="micro")
print (f"{basic_model_name}: train_score {basic_model_train_score}, test_score {basic_model_test_score}")
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC #### Testing sk-dist modeling
# COMMAND ----------
sk_dist_model_name = f"sk-dist model"
sk_dist_model = DistGridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE),
param_grid=get_params(),
cv=NUM_FOLDS,
scoring=SKLEARN_SCORING,
sc=spark.sparkContext)
sk_dist_model.fit(X=X_train, y=y_train)
sk_dist_model_train_score = sk_dist_model.best_score_
sk_dist_model_test_score = f1_score(sk_dist_model.predict(X_test), y_test, average="micro")
print (f"{sk_dist_model_name}: train_score {sk_dist_model_train_score}, test_score {sk_dist_model_test_score}")
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC #### Testing run times
# COMMAND ----------
def run_sk_dist_model(X=X_train, y=y_train):
DistGridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE),
param_grid=get_params(),
cv=NUM_FOLDS,
scoring=SKLEARN_SCORING,
sc=spark.sparkContext).fit(X, y)
def run_basic_model(X=X_train, y=y_train):
GridSearchCV(estimator=SklearnDecisionTreeClassifier(random_state=RANDOM_STATE),
param_grid=get_params(),
cv=NUM_FOLDS,
scoring=SKLEARN_SCORING,
n_jobs=N_JOBS).fit(X, y)
def run_pyspark_model(data=train_data_pyspark):
get_pyspark_validator().fit(data)
runs = {
sk_dist_model_name: run_sk_dist_model,
basic_model_name: run_basic_model,
pyspark_model_name: run_pyspark_model
}
results = []
for model_name, func in runs.items():
stats = timeit(func, number=NUM_TRIALS)
results.append({"model_name": model_name, "stats": stats})
# COMMAND ----------
for _ in results:
print_stats(model_name=_["model_name"], stats=_["stats"])
print()
# COMMAND ----------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment