Skip to content

Instantly share code, notes, and snippets.

@ogrisel
Last active December 25, 2015 16:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ogrisel/7008622 to your computer and use it in GitHub Desktop.
Save ogrisel/7008622 to your computer and use it in GitHub Desktop.
Parallel tree building with threads
from __future__ import division, print_function
# Author: Peter Prettenhofer <peter.prettenhofer@gmail.com>
# Olivier Grisel <olivier.grisel@ensta.org>
# License: BSD 3 clause
import os
from time import time
from concurrent.futures import ThreadPoolExecutor
import numpy as np
from sklearn.datasets import fetch_covtype
from sklearn.tree import DecisionTreeClassifier
from joblib import Memory
from joblib import Parallel, delayed
# from memory_profiler import Timestamper
# Memoize the data extraction and memory map the resulting
# train / test splits in readonly mode
bench_folder = os.path.dirname(__file__)
original_archive = os.path.join(bench_folder, 'covtype.data.gz')
joblib_cache_folder = os.path.join(bench_folder, 'bench_covertype_data')
m = Memory(joblib_cache_folder, mmap_mode='r')
# profile = Timestamper()
# Load the data, then cache and memmap the train/test split
@m.cache
def load_data(dtype=np.float32, order='C'):
######################################################################
## Load dataset
print("Loading dataset...")
data = fetch_covtype(download_if_missing=True, shuffle=True,
random_state=2)
X, y = data['data'], data['target']
X = np.asarray(X, dtype=dtype)
if order.lower() == 'f':
X = np.asfortranarray(X)
# class 1 vs. all others.
y[np.where(y != 1)] = -1
######################################################################
## Create train-test split (as [Joachims, 2006])
#n_train = 522911
n_train = 100000
X_train = X[:n_train]
y_train = y[:n_train]
X_test = X[n_train:]
y_test = y[n_train:]
######################################################################
## Standardize first 10 features (the numerical ones)
mean = X_train.mean(axis=0)
std = X_train.std(axis=0)
mean[10:] = 0.0
std[10:] = 1.0
X_train = (X_train - mean) / std
X_test = (X_test - mean) / std
return X_train, X_test, y_train, y_test
X_train, X_test, y_train, y_test = load_data()
######################################################################
## Print dataset statistics
print("Dataset statistics:")
print("===================")
print("%s %d" % ("number of features:".ljust(25),
X_train.shape[1]))
print("%s %d" % ("number of classes:".ljust(25),
np.unique(y_train).shape[0]))
print("%s %s" % ("data type:".ljust(25), X_train.dtype))
print("%s %d (pos=%d, neg=%d, size=%dMB)"
% ("number of train samples:".ljust(25),
X_train.shape[0], np.sum(y_train == 1),
np.sum(y_train == -1), int(X_train.nbytes / 1e6)))
print("%s %d (pos=%d, neg=%d, size=%dMB)"
% ("number of test samples:".ljust(25),
X_test.shape[0], np.sum(y_test == 1),
np.sum(y_test == -1), int(X_test.nbytes / 1e6)))
def train_one_tree(X_train, y_train):
tic = time()
tree = DecisionTreeClassifier().fit(X_train, y_train)
d = time() - tic
print("-> trained one tree in {0:.3}s".format(d))
# Return the learned tree to get realistic pickling overhead in
# the multiprocessing case
return d, tree
# @profile
def run_benchmarks(n_trees, max_workers):
print("Training one:")
one_tree_time, tree = train_one_tree(X_train, y_train)
print("Training {0} trees with {1} threads".format(
n_trees, max_workers))
# with profile.timestamp("threading"):
with ThreadPoolExecutor(max_workers=max_workers) as e:
tic = time()
tasks = [e.submit(train_one_tree, X_train, y_train)
for i in range(n_trees)]
sum([r.result()[0] for r in tasks]) # block
parallel_time = time() - tic
print("Time to train {0} trees in parallel: {1:.3}s".format(
n_trees, parallel_time))
print("Speedup: {0:.1f}x".format(one_tree_time * n_trees
/ parallel_time))
print("Training {0} trees with {1} processes and memory mapping".format(
n_trees, max_workers))
# with profile.timestamp("multiprocessing_memmap"):
tic = time()
Parallel(n_jobs=max_workers)(delayed(train_one_tree)(X_train, y_train)
for i in range(n_trees))
parallel_time = time() - tic
print("Time to train {0} trees in parallel: {1:.3}s".format(
n_trees, parallel_time))
print("Speedup: {0:.1f}x".format(one_tree_time * n_trees
/ parallel_time))
print("Training {0} trees with {1} processes w/o memory mapping".format(
n_trees, max_workers))
# In-memory array for the source data instead of mmap backed data
X_train_array = X_train.copy()
# with profile.timestamp("multiprocessing_no_memmap"):
tic = time()
# disable memmaping of large arrays with max_nbytes=None
Parallel(n_jobs=max_workers, max_nbytes=None
)(delayed(train_one_tree)(X_train_array, y_train)
for i in range(n_trees))
parallel_time = time() - tic
print("Time to train {0} trees in parallel: {1:.3}s".format(
n_trees, parallel_time))
print("Speedup: {0:.1f}x".format(one_tree_time * n_trees
/ parallel_time))
run_benchmarks(n_trees=8, max_workers=4)
# with open("mprofile_bench_tree.dat", 'wb') as f:
# profile.show_results(stream=f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment