Last active
November 16, 2020 16:02
-
-
Save svenvanhal/5e9593b1f3f065cc286a0d079b194af7 to your computer and use it in GitHub Desktop.
Parallel predict / score_samples for Scikit-Learn IsolationForest benchmark. Parallelizes the scoring function across multiple processes for a significant speed-up on multicore machines, as by default only a single core is used.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool | |
from os import sched_getaffinity | |
from timeit import timeit | |
import numpy as np | |
from joblib import Parallel, delayed | |
from sklearn.datasets import make_classification | |
from sklearn.ensemble import IsolationForest | |
def main(): | |
# Run tests with varying number of observations | |
# Notice the cross-over point after which parallel processing is faster. | |
print(' Baseline mp.Pool joblib') | |
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(1_000))) | |
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(10_000))) | |
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(100_000))) | |
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(1_000_000))) | |
# Sample output: | |
# | |
# Baseline mp.Pool joblib | |
# 0.08s 0.14s 1.34s | |
# 0.24s 0.26s 0.30s | |
# 2.06s 0.99s 1.03s | |
# 21.40s 9.83s 10.51s | |
def run_comparison(n_samples): | |
X, _ = make_classification(n_samples + 10) | |
X_train, X_test = X[:10], X[10:] | |
iso = IsolationForest() | |
iso.fit(X_train) | |
# Use all cores | |
n_chunks = len(sched_getaffinity(0)) | |
delta_baseline = timeit(lambda: baseline(iso, X_test), number=1) | |
# Split test array | |
chunks = np.array_split(X_test, n_chunks) | |
# Predict in parallel | |
delta_mp = timeit(lambda: parallel_mp(iso, chunks), number=1) | |
delta_joblib = timeit(lambda: parallel_joblib(iso, chunks), number=1) | |
return delta_baseline, delta_mp, delta_joblib | |
def parallel_mp(iso, chunks): | |
"""Option 1: multiprocessing.Pool""" | |
with Pool(len(chunks)) as pool: | |
y_score = np.concatenate(pool.map(iso.score_samples, chunks)) | |
return y_score | |
def parallel_joblib(iso, chunks): | |
"""Option 2: joblib.Parallel""" | |
par_exec = Parallel(n_jobs=len(chunks), max_nbytes='8G') | |
y_score = np.concatenate(par_exec(delayed(iso.score_samples)(_X) for _X in chunks)) | |
return y_score | |
def baseline(iso, X_test): | |
"""Baseline scoring (no parallelization)""" | |
y_score = iso.score_samples(X_test) | |
return y_score | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment