Last active
August 11, 2017 04:40
-
-
Save itdxer/c75ea4df8a085ae6035d5818a1d7396e to your computer and use it in GitHub Desktop.
Fuzzy C-means in Theano
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import theano | |
import theano.tensor as T | |
import numpy as np | |
def asfloat(value): | |
""" Convert variable to float type configured by theano | |
floatX variable. | |
Parameters | |
---------- | |
value : matrix, ndarray or scalar | |
Value that could be converted to float type. | |
Returns | |
------- | |
matrix, ndarray or scalar | |
Output would be input value converted to float type | |
configured by theano floatX variable. | |
""" | |
if isinstance(value, (np.matrix, np.ndarray)): | |
return value.astype(theano.config.floatX) | |
float_x_type = np.cast[theano.config.floatX] | |
return float_x_type(value) | |
def clone(instance): | |
class_ = instance.__class__ | |
parameters = instance.get_params() | |
return class_(**parameters) | |
class FuzzyCMeans(object): | |
""" | |
Fuzzy c-means. | |
Paramaters | |
---------- | |
n_clusters : int | |
Number of clusters. | |
m : float | |
""" | |
def __init__(self, n_clusters, m=2): | |
if n_clusters < 2: | |
raise ValueError("Number of clusters should be greater than 2") | |
if m < 1: | |
raise ValueError("Parameter `m` should be greater than 1") | |
self.n_clusters = n_clusters | |
self.m = m | |
self.centers_ = None | |
self.is_initialized = False | |
def init_methods(self): | |
if self.is_initialized: | |
raise AttributeError("Methods have already been initialized.") | |
x = T.matrix('x') | |
centers = self.centers_ | |
d = distance_to_centers = ( | |
x.reshape((x.shape[0], 1, x.shape[1])) - | |
centers.reshape((1, centers.shape[0], centers.shape[1])) | |
).norm(L=2, axis=2) | |
weights = 1 / ( | |
( | |
distance_to_centers.reshape((d.shape[0], d.shape[1], 1)) / | |
distance_to_centers.reshape((d.shape[0], 1, d.shape[1])) | |
) ** asfloat(2. / (self.m - 1)) | |
).sum(axis=2) | |
proba = weights / T.sum(weights, axis=1).reshape((-1, 1)) | |
proba_power_m = weights ** self.m | |
new_centers = ( | |
proba_power_m.T.dot(x) / | |
T.sum(proba_power_m, axis=0).reshape((-1, 1)) | |
) | |
self.predict_proba = theano.function([x], proba) | |
self.train_iteration = theano.function([x], proba, updates=[ | |
(centers, new_centers), | |
]) | |
self.is_initialized = True | |
def get_params(self, deep=False): | |
return dict(n_clusters=self.n_clusters, m=self.m) | |
@property | |
def centers(self): | |
return self.centers_.get_value() | |
def fit(self, data, maxiter=100, epsilon=1e-5, verbose=False): | |
n_features = data.shape[1] | |
if self.centers_ is None: | |
data_min = data.min(axis=0) | |
data_max = data.max(axis=0) | |
random_centers = np.random.random((self.n_clusters, n_features)) | |
scaled_centers = (data_max - data_min) * random_centers + data_min | |
self.centers_ = theano.shared( | |
name='centers', | |
value=asfloat(scaled_centers) | |
) | |
n_expected_features = self.centers.shape[1] | |
if n_expected_features != n_features: | |
raise ValueError("Input data must contain {} features, " | |
"found {}".format(n_expected_features, | |
n_features)) | |
if not self.is_initialized: | |
self.init_methods() | |
i = 1 | |
proba_update = np.inf | |
prev_proba = None | |
while (proba_update > epsilon) and (i <= maxiter): | |
proba = self.train_iteration(data) | |
if prev_proba is not None: | |
proba_update = np.linalg.norm(prev_proba - proba) | |
prev_proba = proba | |
i += 1 | |
def predict(self, data): | |
proba = self.predict_proba(data) | |
return proba.argmax(axis=1) | |
def select_best_clustering(algorithm, n_trials, data, **fit_kwargs): | |
""" Select best clusters using SSE. | |
Parameters | |
---------- | |
algorithm : object | |
n_trials : int | |
data : matrix | |
Raises | |
------ | |
ValueError | |
Exception will raise in case input parameter values | |
are invalid. | |
Returns | |
------- | |
object | |
Pretrained clustering algorithm that give smallest | |
SSE (sum of squared error) score. | |
""" | |
if n_trials < 1: | |
raise ValueError("Number of trials should be greater than 1") | |
if not isinstance(n_trials, int): | |
raise ValueError("Number of tirals should be an integer number") | |
algorithms = [] | |
for trial in range(n_trials): | |
algorithm = clone(algorithm) | |
algorithm.fit(data, **fit_kwargs) | |
clusters = algorithm.predict(data) | |
centers = algorithm.centers[clusters, :] | |
sse_score = np.sum((data - centers) ** 2) | |
# We should use second variable as unique value to prevent | |
# object instance comparison when we have exactly the same | |
# score values. | |
algorithms.append((sse_score, trial, algorithm)) | |
_, _, best_algorithm = min(algorithms) | |
return best_algorithm | |
if __name__ == '__main__': | |
fcm = FuzzyCMeans(n_clusters=2, m=2) | |
fcm.fit(data, maxiter=100) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment