Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ravipudi/7703210 to your computer and use it in GitHub Desktop.
Save ravipudi/7703210 to your computer and use it in GitHub Desktop.
"""
This class has clustering modules and supporting data-structures. I will write a wiki explaining how to use it.
TODO : Write Unit Test And Test It
TODO : Add knn algorithm.
#from clustering import NBasisVector, NBasisVectorForClustering,\
# NBasisVectorForDBSCANClustering, Cluster, ClusteringAlgo, DBSCANClusteringAlgo
#x = (1,2,3)
#x1 = (1,2,3)
#x2 = (4,5,6)
#x3 = (5, 5,6)
#x4 = (10, 7 ,8)
#x5 = (5,5,5000)
#def sample_distance_function(vector1, vector2):
# counter = 0
# sum = 0
# while(counter < len(vector1)):
# sum = sum + vector1[counter] * vector2[counter]
# counter = counter + 1
# return sum
#input_full = (x1,x2,x3,x4,x5)
#basis_vectors = []
#for input in input_full:
# basis_vectors.append(NBasisVector(vector_tuple=input, distance_function=sample_distance_function))
#bs1 = basis_vectors[0]
#bs2 = basis_vectors[1]
#bs1.distance(bs2)
"""
class NBasisVector(object):
"""
This class represents any vector with N Basis.
"""
def __init__(self, vector_tuple, vector_display_tuple=None, distance_function=None):
self._vector_tuple = vector_tuple
self._vector_display_tuple = None
self._distance_function_default_args_map = {}
self._distance_function = distance_function
if vector_display_tuple:
assert len(vector_display_tuple) == len(vector_tuple), "Length of display tuple\
should be same as length of valsue tuple"
self._display_tuple = vector_display_tuple
def set_distance_function(self, distance_function):
self._distance_function = distance_function
def get_distance_function(self):
return self._distance_function
def set_vector_tuple(self, vector_tuple):
self._vector_tuple = vector_tuple
def get_vector_tuple(self):
return self._vector_tuple
def set_vector_display_tuple(self, vector_display_tuple):
self._vector_display_tuple = vector_display_tuple
def get_vector_display_tuple(self, vector_display_tuple):
return self._vector_display_tuple
def set_distance_function_default_args_map(self, args_map):
self._distance_function_default_args_map = args_map
def get_distance_function_default_args_map(self):
return self._distance_function_default_args_map
def distance(self, nbasis_vector, *args, **kwargs):
return self.get_distance_function()(self._vector_tuple, nbasis_vector.get_vector_tuple(),
**self._distance_function_default_args_map)
class NBasisVectorForClustering(NBasisVector):
"""
This class represents any vector with N Basis. For simplicity and fast execution
we can just provide number N and this will be a sequence of N numbers.
"""
def __init__(self, cluster=None, *args, **kwargs):
"""
"""
super(NBasisVectorForClustering, self).__init__(*args, **kwargs)
self._cluster = cluster
def set_cluster(self, cluster):
self._cluster = cluster
def get_cluster(self):
return self._cluster
def construct_from_super_class(self, super_class_obj):
return NBasisVectorForClustering( vector_tuple=super_class_obj.get_vector_tuple(),
vector_display_tuple=super_class_obj.get_vector_display_tuple(),
distance_function=super_class_obj.get_distance_function())
class NBasisVectorForDBSCANClustering(NBasisVectorForClustering):
"""
"""
def __init__(self, *args, **kwargs):
"""
"""
super(NBasisVectorForDBSCANClustering, self).__init__(*args, **kwargs)
self._visited = False
self._noise = False
self._neighbour_count = None
def set_visited(self):
self._visited = True
def is_visited(self):
return self._visited
def set_noise(self):
self._noise = True
def is_noise(self):
return self._noise
def set_neighbour_count(self, neighbour_count):
self._neighbour_count = neighbour_count
def get_neighbour_count(self):
return self._neighbour_count
def construct_from_super_class(self, super_class_obj):
return NBasisVectorForDBSCANClustering( vector_tuple=super_class_obj.get_vector_tuple(),
vector_display_tuple=super_class_obj.get_vector_display_tuple(),
distance_function=super_class_obj.get_distance_function(),
cluster=super_class_obj.get_cluster())
class Cluster():
def __init__(self, cluster_number, display_name=None):
self._cluster_number = cluster_number
self._cluster_elements = set()
def get_cluster_number(self):
return self._cluster_number
def get_elements(self):
return self._cluster_elements
def add_elements(self, elements):
for element in elements:
element.set_cluster(self._cluster_number)
element.set_noise()
self._cluster_elements = self._cluster_elements.union(set(elements))
class ClusteringAlgo(object):
def __init__(self):
self._data = set()
def start(self):
pass
def simple_basis_vetor_to_algo_basis(self, nvectors):
for nvactor in nvectors:
self._data.add(NBasisVectorForClustering.construct_from_super_class(self, nvactor))
class DBSCANClusteringAlgo(ClusteringAlgo):
def __init__(self, data_points, eps, min_pts, *args, **kwargs):
super(DBSCANClusteringAlgo, self).__init__(*args, **kwargs)
self.simple_basis_vetor_to_algo_basis(data_points)
self._eps = eps
self._min_pts = min_pts
self._max_cluster_number = 0
self._all_clusters = set()
def _region_query(self, element):
# This function just find out whatever is in eps distance of the point
neighbourhood = set()
for point in self._data:
if point is element:
continue
if (abs(element.distance(point))) < self._eps:
neighbourhood.add(point)
element.set_neighbour_count(len(neighbourhood))
return neighbourhood
def start(self):
for element in self._data:
if element.is_visited():
continue
element.set_visited()
neighbourhood = self._region_query(element)
if len(neighbourhood) < self._min_pts:
element.noise = True
else:
self._max_cluster_number = self._max_cluster_number + 1
new_cluster = Cluster(cluster_number=self._max_cluster_number)
self._all_clusters.add(new_cluster)
self.__expand_cluster(new_cluster, element, neighbourhood)
return self._data
def _expand_cluster(self, cluster, element, neighbourhood):
cluster.add_elements((element,))
for neighbour in neighbourhood:
if not neighbour.is_visited():
neighbour.set_visited()
neighbours_neighbourhood = self._region_query()
if len(neighbours_neighbourhood) >= self._min_pts:
neighbourhood = neighbourhood.union(neighbours_neighbourhood)
if neighbour.get_cluster() is None:
neighbour.set_cluster(self)
cluster.add_elements((neighbour,))
def simple_basis_vetor_to_algo_basis(self, nvectors):
super(NBasisVectorForClustering, self).simple_basis_vetor_to_algo_basis(nvectors)
data = self._data
self._data = set()
for nvactor in data:
self._data.add(NBasisVectorForDBSCANClustering.construct_from_super_class(self, nvactor))
# Initial eclipsing binary star intro
# http://www.physics.sfasu.edu/astro/ebstar/ebstar.html
#
class LightCurve(object):
"""
This class represents light curve of an astronomical object. Light curve means
curve between time and magnitude (visual magnitude in this case).
"""
def __init__(self, start_date, time_unit):
self._points = []
self._start_date = start_date
self._time_unit = time_unit
def append_point(magnitude):
self._points.add(magnitude)
def get_points(self):
return self._points
class EclipsingBinaryStar(BinaryStar):
def __init__(self, name):
self.__name = name
@property
def name(self, name):
return self.__name
@name.setter
def name(self, value):
self.__name = value
@name.deleter
def name(self):
del self.__name
@property
def period(self):
if exists(self.__period)
return self.__period
else:
return None
@status.setter
def period(self, value):
self.__period = value
@status.deleter
def period(self):
del self.__period
@property
def light_curve(self):
if exists(self.__light_curve)
return self.__light_curve
else:
return None
@status.setter
def light_curve(self, value):
self.__light_curve = value
@status.deleter
def period(self):
del self.__light_curve
def calculate_period_from_light_curve(self):
raise NotImplementedError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment