Skip to content

Instantly share code, notes, and snippets.

@forhappy
Created September 23, 2011 13:42
Show Gist options
  • Save forhappy/1237352 to your computer and use it in GitHub Desktop.
Save forhappy/1237352 to your computer and use it in GitHub Desktop.
isodata clustering algorithm(not finished yet)
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import cmath
import os.path
class ISOData:
'''
@descriptions: ISOData Algorithm implementation.
@filename: Filename of input data.
@knums: Expected clusters.
@min_obs: Minimum observations of each cluster, if a cluster has less
observations than specified by @min_obs, it will not be to a cluster.
@std_dvt: Standard deviation of observations within every cluster.
@min_dis: Minimum distance of two cluster, if not, merge the two
clusters.
@max_pairs: Maximum pairs that can be coalesced in one iteration.
@iterations: Sequential number of iterations.
'''
def __init__(self, filename, knums, min_obs, std_dvt, \
min_dis, max_pairs, iterations, normalized = False):
self._filename = filename;
self._knums = knums
self._min_obs = min_obs
self._std_dvt = std_dvt
self._min_dis = min_dis
self._max_pairs = max_pairs
self._iterations = iterations
self._normalized = normalized
"""self._samples | self._normalized_samples
:= [(seqx, x1, x2, ..., xn),
(seqy, y1, y2, ..., yn),
...,
(seqz, z1, z2, ..., zn)]"""
self._samples= []
self._normalized_samples = []
"""self._clusters :=[[(0, c1, c2, ..., cn), (seqx, x1, x2, ..., xn), (seqy, y1, y2, ..., yn)],
[],
...,
[]]"""
self._clusters = []
"""current iterations"""
self._cur_iter = 0
"""vector dimension"""
self._dimension = 0
"""mean distance within a cluster."""
self._mean_distances = []
"""average distance of all patterns to its cluster core."""
self._average_distance = 0
"""list of standard deviation vector of patterns of clusters, which has the form:
[[O1j, O2j, O3j, Onj], ..., [], []]"""
self._std_deviation_vector_of_clusters = []
"""list of mean distances of every two cluster cores."""
self._distance_between_clusters = []
self._open(self._filename)
def _normalize(self):
"""
@description: Normalize the attributes of input data.
"""
new_samples = []
for t in xrange(len(self._samples)):
st = list(self._samples[t])
new_samples.append(st)
for d in xrange(1, (self._dimension + 1)):
container_att = []
for idx in xrange(len(new_samples)):
att = new_samples[idx][d]
container_att.append(att)
max_att = max(container_att)
min_att = min(container_att)
for idx in xrange(len(new_samples)):
new_att = (new_samples[idx][d] - min_att) / (max_att - min_att)
new_samples[idx][d] = new_att
for t in xrange(len(new_samples)):
st = tuple(new_samples[t])
self._normalized_samples.append(st)
def _open(self, filename):
"""
@descriptions: Open the data file and fill each item into memory.
@filename : Filename of input data.
"""
data_file= open(self._filename, "r")
data_lines= data_file.readlines();
for line in data_lines:
string_samples = line.split(" ")
integer_samples= []
integer_samples.append(int(string_samples[0]))
for e in string_samples[1:]:
integer_samples.append(float(e))
samples = tuple(integer_samples)
self._samples.append(samples)
#print self._samples
self._dimension = len(self._samples[0]) - 1
#print self._dimension
pass
def _reconstruct_cluster_core(self, idx):
"""
@description: Reconstruct the cluster points.
@idx : Index of clusters, where clusters has the form as follows:
self._clusters :=[[(0, c1, c2, ..., cn), (seqx, x1, x2, ..., xn), (seqy, y1, y2, ..., yn)],
[],
...,
[]]
"""
new_cluster = []
new_cluster.append(0)
for old_value in self._clusters[idx][0][1:]:
new_cluster.append(old_value)
for i in xrange(len(self._clusters[idx])):
self._clusters[idx].pop()
self._clusters[idx].insert(0, new_cluster)
pass
def _reset_cluster_core(self):
if (len(self._clusters) != 0):
for idx in xrange(len(self._clusters)):
self._reconstruct_cluster_core(idx)
pass
pass
pass
def _optdis_index(self, ob):
if (len(self._clusters) != 0):
optdis = []
for e in xrange(len(self._clusters)):
cluster_core = self._clusters[e][0]
dis = self._distance(ob[1:], cluster_core[1:])
optdis.append(dis)
pass
min_dis = min(optdis)
opt_idx = optdis.index(min_dis)
return opt_idx
pass
def _assign(self):
if (len(self._clusters) != 0):
self._revise_cluster_core()
self._reset_cluster_core()
for e in self._samples:
idx = self._optdis_index(e)
self._clusters[idx].append(e)
pass
def _check_and_delete(self):
if (len(self._clusters) != 0):
for e in self._clusters:
if (len(e) < self._min_obs):
self._clusters.remove(e)
self._assign()
pass
def _revise_cluster_core(self):
for k in xrange(len(self._clusters)):
new_ktuple = self._means_within_cluster(self._clusters[k][1:])
if (len(new_ktuple) == 0):
continue
if (self._equal(self._clusters[k][0], new_ktuple) == False):
self._clusters[k].pop(0)
self._clusters[k].insert(0, new_ktuple)
else:
continue
def _mean_distances_within_cluster(self):
if (len(self._mean_distances) != 0):
for e in self._clusters:
distance = 0
for p in e[1:]:
distance += cmath.sqrt(self._distance(p[1:], e[0][1:]))
pass
self._mean_distances.append(distance / (len(e) - 1))
pass
pass
def _mean_distances_all_clusters(self):
if (len(self._clusters) != 0 and len(self._mean_distances) != 0):
sum = 0
for k in xrange(len(self._mean_distances)):
sum += len(self._clusters[k]) * self._mean_distances[k]
pass
self._average_distance = sum / len(self._samples)
pass
def _std_deviation_within_cluster(self, idx):
if (len(self._clusters) != 0):
ret_vec = []
for i in xrange(1, (self._dimension+1)):
sum = 0
for e in self._clusters[idx][1:]:
sum += (e[i] - self._clusters[idx][0][i]) * (e[i] - self._clusters[idx][0][i])
pass
sum = sum / len(self._clusters[idx][1:])
ret.vec.append(cmath.sqrt(sum))
pass
return ret_vec
def _std_deviation_vector(self):
if (len(self._clusters) != 0):
for idx in xrange(len(self._clusters)):
l = self._std_deviation_within_cluster(idx)
self._std_deviation_vector_of_clusters.append(l)
pass
def _split(self):
pass
def _merge(self):
pass
def _split_and_merge(self):
pass
def _select(self, knums):
"""
@descriptions: Choose the first knums cluster center.
@knums : Clusters number.
"""
for i in xrange(knums):
if (self._normalized == False):
selected = self._samples[i]
else:
selected = self._normalized_samples[i]
temp = list(selected)
temp[0] = 0
self._clusters.append([])
self._clusters[i].append(temp)
#print self._clusters
pass
def _distance(self, va, vb):
'''
@description: Return the (distance ** 2) of tuple va and tuple vb.
@va : tuple va (x1, x2, ..., xn)
@vb : tuple vb (y1, y2, ..., yn)
'''
distance = 0
for i in xrange(self._dimension):
distance += (va[i] - vb[i]) * (va[i] - vb[i])
#print distance
return distance
def _means_within_cluster(self, va):
"""
@description: Return the means of va.
@va : A tuple of list va, with the form [(flagx, x1, x2, ..., xn),
(flagy, y1, y2, ..., yn),
(flagz, z1, z2, ..., zn),
...]
"""
if (len(va) == 0):
return va
means_cluster = []
means_cluster.append(1)#Indicate that the means has changed.
#print va
for d in xrange(self._dimension):
tmp = 0
for i in xrange(len(va)):
tmp += va[i][d+1]
means_cluster.append(tmp/len(va))
means = tuple(means_cluster)
return means
def _equal(self, ta, tb):
"""
@description: Check if tuple ta equals to tuple tb.
@ta : Tuple ta.(flagx, x1, x2, ..., xn)
@tb : Tuple tb.(flagy, y1, y1, ..., ym)
"""
if (len(ta) != len(tb)):
return False
for i in xrange(1, len(ta)):
if (ta[i] != tb[i]):
return False
return True
def flush(self, filename):
"""
@description: Flush data the disk.
@filename : Filename of output data.
"""
foutput = open(filename, "w")
for c in xrange(self._knums):
foutput.write("Group %d" % c)
for e in self._clusters[c][1:]:
foutput.write("%s" % repr(e))
foutput.write("\n\n\n")
print("Done.")
foutput.close()
pass
def process(self):
"""
@description: Process data, calculating k-means and clustering.
"""
while True:
pass
if __name__ =="__main__":
ikmeans = KMeans("./iris-dat", 3)
ikmeans.process()
ikmeans.flush("./isodata-out.dat")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment