Skip to content

Instantly share code, notes, and snippets.

@dkohlsdorf
Created August 16, 2018 22:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dkohlsdorf/58f7aae7dffbc0764ed2e6eed7243785 to your computer and use it in GitHub Desktop.
Save dkohlsdorf/58f7aae7dffbc0764ed2e6eed7243785 to your computer and use it in GitHub Desktop.
Simple iSAX Implementation
import numpy as np
import scipy.stats as stats
def generate_split(n_char, mu = 0.0, std = 1.0):
return stats.norm.ppf(np.arange(1, n_char) / n_char, mu, std)
def convert(x, splits, cardinality):
n = len(x)
chars = np.zeros(n, dtype=np.int)
for i in range(0, n):
c = 0
while c < len(splits[cardinality[i]]) and x[i] > splits[cardinality[i]][c]:
c += 1
chars[i] = c
return chars
def hashable(x, splits, cardinality, chr_offset = 97):
x = convert(x, splits, cardinality) + chr_offset
return ''.join([chr(c) for c in x])
BASE_CARD = 2
MAX_CARD = 26
SPLITS = [generate_split(i) for i in range(BASE_CARD, MAX_CARD)]
class IndexNode:
def __init__(self, split_size, dim, round_robin = 0, cardinality = None):
self.child = {}
if cardinality is None:
self.cardinality = np.zeros(dim, dtype=np.int)
else:
self.cardinality = cardinality
self.round_robin = round_robin
self.dim = dim
self.split_size = split_size
def next(self):
cardinality = self.cardinality.copy()
cardinality[self.round_robin] += 1
round_robin = self.round_robin + 1
if round_robin == self.dim:
round_robin = 0
return IndexNode(self.split_size, self.dim, round_robin, cardinality)
def insert(self, x):
key = hashable(x, SPLITS, self.cardinality)
if key in self.child:
if type(self.child[key]) == list:
if len(self.child[key]) + 1 > self.split_size:
idx = self.next()
idx.insert(x)
for _x in self.child[key]:
idx.insert(_x)
self.child[key] = idx
else:
self.child[key].append(x)
else:
self.child[key].insert(x)
else:
self.child[key] = [x]
def search(self, x):
key = hashable(x, SPLITS, self.cardinality)
if type(self.child[key]) == list:
return self.chid[key]
else:
return self.search(x)
def size(self):
size = 0
for (k, v) in self.child.items():
if type(v) == list:
size += len(v)
else:
size += v.size()
return size
def max_depth(self, depth):
max_depth = depth
for (k, v) in self.child.items():
if type(v) == dict:
d = v.max_depth(depth + 1)
if d > max_depth:
max_depth = d
return max_depth
def print(self, offset):
for (k, v) in self.child.items():
print("{}{}: ".format(offset, k))
if type(v) == list:
for c in v:
print("{}\t{}".format(offset, c))
else:
v.print(offset + "\t")
idx = IndexNode(20, 10)
for i in range(0, 10000):
print(i)
x = np.random.normal(size=(10))
idx.insert(x)
print(idx.size())
print(idx.max_depth(1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment