Skip to content

Instantly share code, notes, and snippets.

@jiumem
Last active November 5, 2015 15:06
Show Gist options
  • Save jiumem/4fc9e8bac50fa7629f4b to your computer and use it in GitHub Desktop.
Save jiumem/4fc9e8bac50fa7629f4b to your computer and use it in GitHub Desktop.
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors


class IENN():
    def __init__(self, n_estimator, sample_size, n_jobs=1):
        self.n_estimator = n_estimator
        self.sample_size = sample_size
        self.n_jobs = n_jobs
        self.estimators = [NearestNeighbors(n_neighbors=2, algorithm='ball_tree') for i in range(n_estimator)]

    def fit(self, X):
        if type(X) != np.ndarray:
            X = np.array(X)
        data_len = len(X)
        if data_len <= self.sample_size:
            print('sample size is larger than data size!')
            exit()
        #use multiprocess
        for nn in self.estimators:
            index = np.random.choice(range(data_len), self.sample_size, replace=False)
            train_data = X[index]
            nn.fit(train_data)
            distances, indices = nn.kneighbors(train_data)
            nn.train_data = train_data
            nn.nearest_distances_ = distances[:, 1:]
            nn.nearest_indices_ = indices[:, 1:]

    def predict(self, X):
        if type(X) != np.ndarray:
            X = np.array(X)
        if len(X.shape) == 1:
            return np.mean(self.predict_score(X))
        #use multiprocess
        scores = map(self.predict_score, X)
        scores = np.mean(np.array(scores), axis=1)
        return scores

    def smallest_region(self, nn, x):
        index = -1
        smallest_region = np.inf
        for cnt, p in enumerate(nn.train_data):
            d = (((p - x) ** 2).sum()) ** 0.5
            if d <= nn.nearest_distances_[cnt] <= smallest_region:
                index = cnt
                smallest_region = nn.nearest_distances_[cnt]
        return index, smallest_region

    def predict_score(self, x):
        scores = []
        for nn in self.estimators:
            index, smallest_region = self.smallest_region(nn, x)
            if index == -1:
                scores.append(1.)
            else:
                nearest_index = nn.nearest_indices_[index]
                radius = nn.nearest_distances_[nearest_index]
                scores.append(1. - radius / smallest_region)
        return scores

Unit test

    # X = np.random.rand(100, 2)
    # X_test = np.random.rand(10, 2)
    # X_outliers = [[1.2, 1.2], [2, 2]]
    # single_outliers = [1.5, 1.5]
    #
    # ienn = IENN(n_estimator=20, sample_size=50)
    # ienn.fit(X)
    # print('test: ', ienn.predict(X_test))
    # print('multi outliers: ', ienn.predict(X_outliers))
    # print('single outliers: ', ienn.predict(single_outliers))

    # # Generate train data
    # rng = np.random.RandomState(42)
    # X = 0.3 * rng.randn(100, 2)
    # X_train = np.r_[X + 2, X - 2]
    # # Generate some regular novel observations
    # X = 0.3 * rng.randn(20, 2)
    # X_test = np.r_[X + 2, X - 2]
    # # Generate some abnormal novel observations
    # X_outliers = rng.uniform(low=-4, high=4, size=(20, 2))

    # Generate train data
    from sklearn import datasets
    rng = np.random.RandomState(42)
    X_train = datasets.make_circles(n_samples=1000, factor=0.5, noise=0.05)[0]
    # Generate some regular novel observations
    X_test = datasets.make_circles(n_samples=20, factor=0.5, noise=0.05)[0]
    # Generate some abnormal novel observations
    X_outliers = rng.uniform(low=-0.15, high=0.15, size=(20, 2))

    clf = IENN(n_estimator=10, sample_size=100)
    clf.fit(X_train)
    y_pred_train = clf.predict(X_train)
    y_pred_test = clf.predict(X_test)
    print('test: ', y_pred_test)
    y_pred_outliers = clf.predict(X_outliers)
    print('outliers: ', y_pred_outliers)

    # plot the line, the samples, and the nearest vectors to the plane
    xx, yy = np.meshgrid(np.linspace(-1, 1, 50), np.linspace(-1, 1, 50))
    Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    plt.title("Isolation Nearest Neighbor Ensemble")
    plt.contourf(xx, yy, Z, cmap=plt.cm.Blues_r)

    b1 = plt.scatter(X_train[:, 0], X_train[:, 1], c='white')
    b2 = plt.scatter(X_test[:, 0], X_test[:, 1], c='green')
    c = plt.scatter(X_outliers[:, 0], X_outliers[:, 1], c='red')
    plt.axis('tight')
    plt.xlim((-1, 1))
    plt.ylim((-1, 1))
    plt.legend([b1, b2, c],
           ["training observations",
            "new regular observations", "new abnormal observations"],
           loc="upper left")
    plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment