Last active
December 25, 2015 16:49
-
-
Save ogrisel/7008622 to your computer and use it in GitHub Desktop.
Parallel tree building with threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division, print_function | |
# Author: Peter Prettenhofer <peter.prettenhofer@gmail.com> | |
# Olivier Grisel <olivier.grisel@ensta.org> | |
# License: BSD 3 clause | |
import os | |
from time import time | |
from concurrent.futures import ThreadPoolExecutor | |
import numpy as np | |
from sklearn.datasets import fetch_covtype | |
from sklearn.tree import DecisionTreeClassifier | |
from joblib import Memory | |
from joblib import Parallel, delayed | |
# from memory_profiler import Timestamper | |
# Memoize the data extraction and memory map the resulting | |
# train / test splits in readonly mode | |
bench_folder = os.path.dirname(__file__) | |
original_archive = os.path.join(bench_folder, 'covtype.data.gz') | |
joblib_cache_folder = os.path.join(bench_folder, 'bench_covertype_data') | |
m = Memory(joblib_cache_folder, mmap_mode='r') | |
# profile = Timestamper() | |
# Load the data, then cache and memmap the train/test split | |
@m.cache | |
def load_data(dtype=np.float32, order='C'): | |
###################################################################### | |
## Load dataset | |
print("Loading dataset...") | |
data = fetch_covtype(download_if_missing=True, shuffle=True, | |
random_state=2) | |
X, y = data['data'], data['target'] | |
X = np.asarray(X, dtype=dtype) | |
if order.lower() == 'f': | |
X = np.asfortranarray(X) | |
# class 1 vs. all others. | |
y[np.where(y != 1)] = -1 | |
###################################################################### | |
## Create train-test split (as [Joachims, 2006]) | |
#n_train = 522911 | |
n_train = 100000 | |
X_train = X[:n_train] | |
y_train = y[:n_train] | |
X_test = X[n_train:] | |
y_test = y[n_train:] | |
###################################################################### | |
## Standardize first 10 features (the numerical ones) | |
mean = X_train.mean(axis=0) | |
std = X_train.std(axis=0) | |
mean[10:] = 0.0 | |
std[10:] = 1.0 | |
X_train = (X_train - mean) / std | |
X_test = (X_test - mean) / std | |
return X_train, X_test, y_train, y_test | |
X_train, X_test, y_train, y_test = load_data() | |
###################################################################### | |
## Print dataset statistics | |
print("Dataset statistics:") | |
print("===================") | |
print("%s %d" % ("number of features:".ljust(25), | |
X_train.shape[1])) | |
print("%s %d" % ("number of classes:".ljust(25), | |
np.unique(y_train).shape[0])) | |
print("%s %s" % ("data type:".ljust(25), X_train.dtype)) | |
print("%s %d (pos=%d, neg=%d, size=%dMB)" | |
% ("number of train samples:".ljust(25), | |
X_train.shape[0], np.sum(y_train == 1), | |
np.sum(y_train == -1), int(X_train.nbytes / 1e6))) | |
print("%s %d (pos=%d, neg=%d, size=%dMB)" | |
% ("number of test samples:".ljust(25), | |
X_test.shape[0], np.sum(y_test == 1), | |
np.sum(y_test == -1), int(X_test.nbytes / 1e6))) | |
def train_one_tree(X_train, y_train): | |
tic = time() | |
tree = DecisionTreeClassifier().fit(X_train, y_train) | |
d = time() - tic | |
print("-> trained one tree in {0:.3}s".format(d)) | |
# Return the learned tree to get realistic pickling overhead in | |
# the multiprocessing case | |
return d, tree | |
# @profile | |
def run_benchmarks(n_trees, max_workers): | |
print("Training one:") | |
one_tree_time, tree = train_one_tree(X_train, y_train) | |
print("Training {0} trees with {1} threads".format( | |
n_trees, max_workers)) | |
# with profile.timestamp("threading"): | |
with ThreadPoolExecutor(max_workers=max_workers) as e: | |
tic = time() | |
tasks = [e.submit(train_one_tree, X_train, y_train) | |
for i in range(n_trees)] | |
sum([r.result()[0] for r in tasks]) # block | |
parallel_time = time() - tic | |
print("Time to train {0} trees in parallel: {1:.3}s".format( | |
n_trees, parallel_time)) | |
print("Speedup: {0:.1f}x".format(one_tree_time * n_trees | |
/ parallel_time)) | |
print("Training {0} trees with {1} processes and memory mapping".format( | |
n_trees, max_workers)) | |
# with profile.timestamp("multiprocessing_memmap"): | |
tic = time() | |
Parallel(n_jobs=max_workers)(delayed(train_one_tree)(X_train, y_train) | |
for i in range(n_trees)) | |
parallel_time = time() - tic | |
print("Time to train {0} trees in parallel: {1:.3}s".format( | |
n_trees, parallel_time)) | |
print("Speedup: {0:.1f}x".format(one_tree_time * n_trees | |
/ parallel_time)) | |
print("Training {0} trees with {1} processes w/o memory mapping".format( | |
n_trees, max_workers)) | |
# In-memory array for the source data instead of mmap backed data | |
X_train_array = X_train.copy() | |
# with profile.timestamp("multiprocessing_no_memmap"): | |
tic = time() | |
# disable memmaping of large arrays with max_nbytes=None | |
Parallel(n_jobs=max_workers, max_nbytes=None | |
)(delayed(train_one_tree)(X_train_array, y_train) | |
for i in range(n_trees)) | |
parallel_time = time() - tic | |
print("Time to train {0} trees in parallel: {1:.3}s".format( | |
n_trees, parallel_time)) | |
print("Speedup: {0:.1f}x".format(one_tree_time * n_trees | |
/ parallel_time)) | |
run_benchmarks(n_trees=8, max_workers=4) | |
# with open("mprofile_bench_tree.dat", 'wb') as f: | |
# profile.show_results(stream=f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment