Skip to content

Instantly share code, notes, and snippets.

@svenvanhal
Last active November 16, 2020 16:02
Show Gist options
  • Save svenvanhal/5e9593b1f3f065cc286a0d079b194af7 to your computer and use it in GitHub Desktop.
Save svenvanhal/5e9593b1f3f065cc286a0d079b194af7 to your computer and use it in GitHub Desktop.
Parallel predict / score_samples for Scikit-Learn IsolationForest benchmark. Parallelizes the scoring function across multiple processes for a significant speed-up on multicore machines, as by default only a single core is used.
from multiprocessing import Pool
from os import sched_getaffinity
from timeit import timeit
import numpy as np
from joblib import Parallel, delayed
from sklearn.datasets import make_classification
from sklearn.ensemble import IsolationForest
def main():
# Run tests with varying number of observations
# Notice the cross-over point after which parallel processing is faster.
print(' Baseline mp.Pool joblib')
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(1_000)))
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(10_000)))
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(100_000)))
print('{:>8.2f}s {:>8.2f}s {:>8.2f}s'.format(*run_comparison(1_000_000)))
# Sample output:
#
# Baseline mp.Pool joblib
# 0.08s 0.14s 1.34s
# 0.24s 0.26s 0.30s
# 2.06s 0.99s 1.03s
# 21.40s 9.83s 10.51s
def run_comparison(n_samples):
X, _ = make_classification(n_samples + 10)
X_train, X_test = X[:10], X[10:]
iso = IsolationForest()
iso.fit(X_train)
# Use all cores
n_chunks = len(sched_getaffinity(0))
delta_baseline = timeit(lambda: baseline(iso, X_test), number=1)
# Split test array
chunks = np.array_split(X_test, n_chunks)
# Predict in parallel
delta_mp = timeit(lambda: parallel_mp(iso, chunks), number=1)
delta_joblib = timeit(lambda: parallel_joblib(iso, chunks), number=1)
return delta_baseline, delta_mp, delta_joblib
def parallel_mp(iso, chunks):
"""Option 1: multiprocessing.Pool"""
with Pool(len(chunks)) as pool:
y_score = np.concatenate(pool.map(iso.score_samples, chunks))
return y_score
def parallel_joblib(iso, chunks):
"""Option 2: joblib.Parallel"""
par_exec = Parallel(n_jobs=len(chunks), max_nbytes='8G')
y_score = np.concatenate(par_exec(delayed(iso.score_samples)(_X) for _X in chunks))
return y_score
def baseline(iso, X_test):
"""Baseline scoring (no parallelization)"""
y_score = iso.score_samples(X_test)
return y_score
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment