import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors
class IENN():
def __init__(self, n_estimator, sample_size, n_jobs=1):
self.n_estimator = n_estimator
self.sample_size = sample_size
self.n_jobs = n_jobs
self.estimators = [NearestNeighbors(n_neighbors=2, algorithm='ball_tree') for i in range(n_estimator)]
def fit(self, X):
if type(X) != np.ndarray:
X = np.array(X)
data_len = len(X)
if data_len <= self.sample_size:
print('sample size is larger than data size!')
exit()
#use multiprocess
for nn in self.estimators:
index = np.random.choice(range(data_len), self.sample_size, replace=False)
train_data = X[index]
nn.fit(train_data)
distances, indices = nn.kneighbors(train_data)
nn.train_data = train_data
nn.nearest_distances_ = distances[:, 1:]
nn.nearest_indices_ = indices[:, 1:]
def predict(self, X):
if type(X) != np.ndarray:
X = np.array(X)
if len(X.shape) == 1:
return np.mean(self.predict_score(X))
#use multiprocess
scores = map(self.predict_score, X)
scores = np.mean(np.array(scores), axis=1)
return scores
def smallest_region(self, nn, x):
index = -1
smallest_region = np.inf
for cnt, p in enumerate(nn.train_data):
d = (((p - x) ** 2).sum()) ** 0.5
if d <= nn.nearest_distances_[cnt] <= smallest_region:
index = cnt
smallest_region = nn.nearest_distances_[cnt]
return index, smallest_region
def predict_score(self, x):
scores = []
for nn in self.estimators:
index, smallest_region = self.smallest_region(nn, x)
if index == -1:
scores.append(1.)
else:
nearest_index = nn.nearest_indices_[index]
radius = nn.nearest_distances_[nearest_index]
scores.append(1. - radius / smallest_region)
return scores
Unit test
# X = np.random.rand(100, 2)
# X_test = np.random.rand(10, 2)
# X_outliers = [[1.2, 1.2], [2, 2]]
# single_outliers = [1.5, 1.5]
#
# ienn = IENN(n_estimator=20, sample_size=50)
# ienn.fit(X)
# print('test: ', ienn.predict(X_test))
# print('multi outliers: ', ienn.predict(X_outliers))
# print('single outliers: ', ienn.predict(single_outliers))
# # Generate train data
# rng = np.random.RandomState(42)
# X = 0.3 * rng.randn(100, 2)
# X_train = np.r_[X + 2, X - 2]
# # Generate some regular novel observations
# X = 0.3 * rng.randn(20, 2)
# X_test = np.r_[X + 2, X - 2]
# # Generate some abnormal novel observations
# X_outliers = rng.uniform(low=-4, high=4, size=(20, 2))
# Generate train data
from sklearn import datasets
rng = np.random.RandomState(42)
X_train = datasets.make_circles(n_samples=1000, factor=0.5, noise=0.05)[0]
# Generate some regular novel observations
X_test = datasets.make_circles(n_samples=20, factor=0.5, noise=0.05)[0]
# Generate some abnormal novel observations
X_outliers = rng.uniform(low=-0.15, high=0.15, size=(20, 2))
clf = IENN(n_estimator=10, sample_size=100)
clf.fit(X_train)
y_pred_train = clf.predict(X_train)
y_pred_test = clf.predict(X_test)
print('test: ', y_pred_test)
y_pred_outliers = clf.predict(X_outliers)
print('outliers: ', y_pred_outliers)
# plot the line, the samples, and the nearest vectors to the plane
xx, yy = np.meshgrid(np.linspace(-1, 1, 50), np.linspace(-1, 1, 50))
Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)
plt.title("Isolation Nearest Neighbor Ensemble")
plt.contourf(xx, yy, Z, cmap=plt.cm.Blues_r)
b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c='white')
b2 = plt.scatter(X_test[:, 0], X_test[:, 1], c='green')
c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='red')
plt.axis('tight')
plt.xlim((-1, 1))
plt.ylim((-1, 1))
plt.legend([b1, b2, c],
["training observations",
"new regular observations", "new abnormal observations"],
loc="upper left")
plt.show()