Skip to content

Instantly share code, notes, and snippets.

@tam17aki
Last active April 3, 2022 12:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tam17aki/a840708858637d078eb8f9bdff397020 to your computer and use it in GitHub Desktop.
Save tam17aki/a840708858637d078eb8f9bdff397020 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Benchmark script for outlier detection based on k-Nearest Neighbors (kNN).
Copyright (C) 2022 by Akira TAMAMORI
Copyright (C) 2018, Yue Zhao
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import os
import warnings
import numpy as np
from pyod.models.knn import KNN
from scipy.io import loadmat
from sklearn.metrics import precision_recall_fscore_support, roc_auc_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import optuna
# supress warnings for clean output
warnings.filterwarnings("ignore")
NORMAL = 0
ANOMALY = 1
# split ratio of test data
TEST_SIZE = 0.4
# for Optuna
SPLIT_RATIO = 0.1
# Number of search trials for Optuna
N_TRIALS = 50
# Define the number of iterations for evaluation
N_ITER = 10
# Define data file and read X and y
MAT_FILE_LIST = [
"arrhythmia.mat",
"cardio.mat",
"glass.mat",
"ionosphere.mat",
"letter.mat",
"mnist.mat",
"musk.mat",
"optdigits.mat",
"pendigits.mat",
"pima.mat",
"satellite.mat",
"satimage-2.mat",
"shuttle.mat",
"vertebral.mat",
"vowels.mat",
"wbc.mat",
]
# benchmark scores
SCORES = {}
# temporary variables
VARS = {
"X_train": None,
"X_test": None,
"y_train": None,
"y_test": None,
"x_train": None,
"x_valid": None,
"t_valid": None,
"X_train_norm": None,
"X_test_norm": None,
"clf": None,
"study": None,
"anomaly_scores": None,
"anomaly_threshold": None,
"prec": 0.0,
"recall": 0.0,
"fscore": 0.0,
}
class Objective:
"""Objective class for Optuna."""
def __init__(self, X_train, X_valid, y_valid, config):
"""Initialize."""
self.X_train = X_train
self.X_valid = X_valid
self.y_valid = y_valid
self.contamination = config["contamination"]
def __call__(self, trial):
"""Call."""
clf = KNN(
n_neighbors=trial.suggest_int("n_neighbors", 1, 16, 1),
leaf_size=trial.suggest_int("leaf_size", 2, 50, 2),
contamination=self.contamination,
)
clf.fit(self.X_train)
anomaly_scores = clf.decision_function(self.X_valid)
roc = roc_auc_score(self.y_valid, anomaly_scores)
return roc
def print_contamination(self):
"""Dummy function to supress warnings from pylint."""
contamination = self.contamination
print(f"contamination={contamination:.2f}")
def print_result():
"""
Print benchmark results.
"""
print("\nBenchmark Results\n")
for mfile in MAT_FILE_LIST:
auc_ave = SCORES[mfile]["auc"]["ave"]
auc_std = SCORES[mfile]["auc"]["std"]
prec_ave = SCORES[mfile]["prec"]["ave"]
prec_std = SCORES[mfile]["prec"]["std"]
recall_ave = SCORES[mfile]["recall"]["ave"]
recall_std = SCORES[mfile]["recall"]["std"]
fscore_ave = SCORES[mfile]["fscore"]["ave"]
fscore_std = SCORES[mfile]["fscore"]["std"]
print(
f"{mfile}: AUC={auc_ave:.4f} ± {auc_std:.4f}, "
f"Prec={prec_ave:.4f} ± {prec_std:.4f}, "
f"Recall={recall_ave:.4f} ± {recall_std:.4f}, "
f"F1-score={fscore_ave:.4f} ± {fscore_std:.4f}"
)
def benchmark():
"""
Evaluate detectors on benchmark datasets.
"""
for file in MAT_FILE_LIST:
SCORES[file] = {
"auc": {},
"prec": {},
"recall": {},
"fscore": {},
}
for mat_file in MAT_FILE_LIST:
mat = loadmat(os.path.join("data", mat_file))
X = mat["X"]
y = mat["y"].ravel() # 0: normal, 1: anomaly
outliers_fraction = np.count_nonzero(y) / len(y)
scaler = StandardScaler()
scores = {
"auc": [],
"prec": [],
"recall": [],
"fscore": [],
}
for i in range(N_ITER):
print("\n... Processing", mat_file, "...", "Iteration", i + 1)
random_state = np.random.RandomState(i)
(
VARS["X_train"],
VARS["X_test"],
VARS["y_train"],
VARS["y_test"],
) = train_test_split(
X, y, test_size=TEST_SIZE, random_state=random_state, stratify=y
)
# standardizing data for processing
VARS["X_train_norm"] = scaler.fit_transform(VARS["X_train"])
VARS["X_test_norm"] = scaler.transform(VARS["X_test"])
# Split training data further for hyperparameter search
(VARS["x_train"], VARS["x_valid"], _, VARS["t_valid"],) = train_test_split(
VARS["X_train_norm"],
VARS["y_train"],
test_size=SPLIT_RATIO,
stratify=VARS["y_train"],
)
# hyper parameter search via optuna
VARS["study"] = optuna.create_study(direction="maximize")
VARS["study"].optimize(
Objective(
X_train=VARS["x_train"],
X_valid=VARS["x_valid"],
y_valid=VARS["t_valid"],
config={
"contamination": outliers_fraction,
},
),
n_trials=N_TRIALS,
)
# re-fit with best params
VARS["clf"] = KNN(
n_neighbors=VARS["study"].best_params["n_neighbors"],
leaf_size=VARS["study"].best_params["leaf_size"],
contamination=outliers_fraction,
)
VARS["clf"].fit(VARS["x_train"])
# calculate anomaly scores
VARS["anomaly_scores"] = VARS["clf"].decision_function(VARS["X_test_norm"])
VARS["anomaly_threshold"] = np.percentile(
VARS["anomaly_scores"], 100 * (1 - outliers_fraction)
)
# calculate precision, recall, and f1-score
(
VARS["prec"],
VARS["recall"],
VARS["fscore"],
_,
) = precision_recall_fscore_support(
VARS["y_test"],
np.where(
VARS["anomaly_scores"] >= VARS["anomaly_threshold"],
ANOMALY,
NORMAL,
),
average="binary",
)
# store scores in dict
scores["auc"].append(roc_auc_score(VARS["y_test"], VARS["anomaly_scores"]))
scores["prec"].append(VARS["prec"])
scores["recall"].append(VARS["recall"])
scores["fscore"].append(VARS["fscore"])
# calculate average and standard dev. over iterations
for score in ("auc", "prec", "recall", "fscore"):
SCORES[mat_file][score]["ave"] = np.average(np.array(scores[score]))
SCORES[mat_file][score]["std"] = np.std(np.array(scores[score]))
if __name__ == "__main__":
benchmark()
print_result()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment