Demo Notebook + Helper For Distributed Model Selection with IPython.parallel and scikit-learn
import os
from IPython.parallel import interactive
def persist_cv_splits(name, X, y, n_cv_iter=5, suffix="_cv_%03d.pkl",
test_size=0.25, random_state=None):
"""Materialize randomized train test splits of a dataset."""
from sklearn.externals import joblib
from sklearn.cross_validation import ShuffleSplit
import os
cv = ShuffleSplit(X.shape[0], n_iter=n_cv_iter,
test_size=test_size, random_state=random_state)
cv_split_filenames = []
for i, (train, test) in enumerate(cv):
cv_fold = (X[train], y[train], X[test], y[test])
cv_split_filename = os.path.abspath(name + suffix % i)
joblib.dump(cv_fold, cv_split_filename)
return cv_split_filenames
import os
def warm_mmap_on_cv_splits(client, cv_split_filenames):
"""Trigger a disk load on all the arrays of the CV splits
Assume the files are shared on all the hosts using NFS.
# First step: query cluster to fetch one engine id per host
all_engines = client[:]
def hostname():
import socket
return socket.gethostname()
hostnames = all_engines.apply(hostname).get_dict()
one_engine_per_host = dict((hostname, engine_id)
for engine_id, hostname
in hostnames.items())
hosts_view = client[one_engine_per_host.values()]
# Second step: for each data file and host, mmap the arrays of the file
# and trigger a sequential read of all the arrays' data
def load_in_memory(filenames):
from sklearn.externals import joblib
for filename in filenames:
arrays = joblib.load(filename, mmap_mode='r')
for array in arrays:
array.sum() # trigger the disk read
cv_split_filenames = [os.path.abspath(f) for f in cv_split_filenames]
hosts_view.apply_sync(load_in_memory, cv_split_filenames)
from collections import namedtuple
from collections import defaultdict
from IPython.parallel import interactive
from IPython.parallel import TaskAborted
from scipy.stats import sem
import numpy as np
from sklearn.utils import check_random_state
# sklearn 0.14+
from sklearn.grid_search import ParameterGrid
except ImportError:
# sklearn 0.13
from sklearn.grid_search import IterGrid as ParameterGrid
from mmap_utils import warm_mmap_on_cv_splits
def is_aborted(task):
return isinstance(getattr(task, '_exception', None), TaskAborted)
def compute_evaluation(model, cv_split_filename, params=None,
train_fraction=1.0, mmap_mode='r'):
"""Function executed on a worker to evaluate a model on a given CV split"""
# All module imports should be executed in the worker namespace
from time import time
from sklearn.externals import joblib
X_train, y_train, X_test, y_test = joblib.load(
cv_split_filename, mmap_mode=mmap_mode)
# Slice a subset of the training set for plotting learning curves
n_samples_train = int(train_fraction * X_train.shape[0])
X_train = X_train[:n_samples_train]
y_train = y_train[:n_samples_train]
# Configure the model
if model is not None:
# Fit model and measure training time
t0 = time(), y_train)
train_time = time() - t0
# Compute score on training set
train_score = model.score(X_train, y_train)
# Compute score on test set
test_score = model.score(X_test, y_test)
# Wrap evaluation results in a simple tuple datastructure
return (test_score, train_score, train_time,
train_fraction, params)
# Named tuple to collect evaluation results
Evaluation = namedtuple('Evaluation', (
class RandomizedGridSeach(object):
""""Async Randomized Parameter search."""
def __init__(self, load_balanced_view, random_state=0):
self.task_groups = []
self.lb_view = load_balanced_view
self.random_state = random_state
def map_tasks(self, f):
return [f(task) for task_group in self.task_groups
for task in task_group]
def abort(self):
for task_group in self.task_groups:
for task in task_group:
if not task.ready():
except AssertionError:
return self
def wait(self):
self.map_tasks(lambda t: t.wait())
return self
def completed(self):
return sum(self.map_tasks(lambda t: t.ready() and not is_aborted(t)))
def total(self):
return sum(self.map_tasks(lambda t: 1))
def progress(self):
c = self.completed()
if c == 0:
return 0.0
return float(c) /
def reset(self):
# Abort any other previously scheduled tasks
self.map_tasks(lambda t: t.abort())
# Schedule a new batch of evalutation tasks
self.task_groups, self.all_parameters = [], []
def launch_for_splits(self, model, parameter_grid, cv_split_filenames,
"""Launch a Grid Search on precomputed CV splits."""
# Abort any existing processing and erase previous state
self.parameter_grid = parameter_grid
# Warm the OS disk cache on each host with sequential reads
# XXX: fix me: interactive namespace issues to resolve
# if pre_warm:
# warm_mmap_on_cv_splits(self.lb_view.client, cv_split_filenames)
# Randomize the grid order
random_state = check_random_state(self.random_state)
self.all_parameters = list(ParameterGrid(parameter_grid))
for params in self.all_parameters:
task_group = []
for cv_split_filename in cv_split_filenames:
task = self.lb_view.apply(compute_evaluation,
model, cv_split_filename, params=params)
# Make it possible to chain method calls
return self
def find_bests(self, n_top=5):
"""Compute the mean score of the completed tasks"""
mean_scores = []
for params, task_group in zip(self.all_parameters, self.task_groups):
evaluations = [Evaluation(*t.get())
for t in task_group
if t.ready() and not is_aborted(t)]
if len(evaluations) == 0:
val_scores = [e.validation_score for e in evaluations]
train_scores = [e.train_score for e in evaluations]
mean_scores.append((np.mean(val_scores), sem(val_scores),
np.mean(train_scores), sem(train_scores),
return sorted(mean_scores, reverse=True)[:n_top]
def report(self, n_top=5):
bests = self.find_bests(n_top=n_top)
output = "Progress: {0:02d}% ({1:03d}/{2:03d})\n".format(
int(100 * self.progress()), self.completed(),
for i, best in enumerate(bests):
output += ("\nRank {0}: validation: {1:.5f} (+/-{2:.5f})"
" train: {3:.5f} (+/-{4:.5f}):\n {5}".format(
i + 1, *best))
return output
def __repr__(self):
def boxplot_parameters(self, display_train=False):
"""Plot boxplot for each parameters independently"""
import pylab as pl
results = [Evaluation(*task.get())
for task_group in self.task_groups
for task in task_group
if task.ready() and not is_aborted(task)]
n_rows = len(self.parameter_grid)
for i, (param_name, param_values) in enumerate(self.parameter_grid.items()):
pl.subplot(n_rows, 1, i + 1)
val_scores_per_value = []
train_scores_per_value = []
for param_value in param_values:
train_scores = [r.train_score for r in results
if r.parameters[param_name] == param_value]
val_scores = [r.validation_score for r in results
if r.parameters[param_name] == param_value]
widths = 0.25
positions = np.arange(len(param_values)) + 1
offset = 0
if display_train:
offset = 0.175
pl.boxplot(train_scores_per_value, widths=widths,
positions=positions - offset)
pl.boxplot(val_scores_per_value, widths=widths,
positions=positions + offset)
pl.xticks(np.arange(len(param_values)) + 1, param_values)
pl.ylabel("Val. Score")
