some numpy functions for dynamic time warping
import numpy as np
from scipy.spatial.distance import cdist
from .dtw import dtw
from .lb import envelope, lb_keogh_cdist, lb_keogh_from_bounds
from .window import window_ts
def dtw_nearest_neighbours(query, db, bound_reach, step_size, subseq=False):
assert query.ndim == 1
assert db.ndim == 2
assert query.flags.contiguous
query_lb, query_ub = envelope(query, bound_reach)
query_len = query.shape[0]
db_len = db.shape[1]
if query_len > db_len:
windows = window_ts(query, size=db_len, step_size=step_size)
envelope_windows = window_ts(np.c_[query_lb, query_ub],
axis=0).transpose(0, 2, 1)
elif query_len == db_len:
windows = query.reshape(1, -1)
envelope_windows = np.expand_dims(np.c_[query_lb, query_ub].T, 0)
raise NotImplementedError()
lb_scores = lb_keogh_cdist(envelope_windows, db).min(0)
best_lb_idx = np.argsort(lb_scores)
best_dtw = np.inf
nearest_neighbour = None
nn_idx = None
for idx in best_lb_idx:
if lb_scores[idx] < best_dtw:
candidate = db[idx]
dtw_score = dtw(candidate, query, subseq=subseq)
if dtw_score < best_dtw:
best_dtw = dtw_score
nearest_neighbour = candidate
nn_idx = idx
return nearest_neighbour, best_dtw, nn_idx
### DTW Kmeans ++ initialisation:
# adapted from
def kmeans_plus_plus(X, n_clusters, metric='dtw', bound_reach=5):
n_samples, n_features = X.shape
if metric == 'dtw':
def metric(X, Y):
_metric = partial(dtw, subseq=False, backtrack=False, return_matrix=False)
return cdist(X, Y, metric=_metric)
elif metric == 'dtw_subseq':
def metric(X, Y):
_metric = partial(dtw, subseq=True, backtrack=False, return_matrix=False)
return cdist(X, Y, metric=_metric)
# dtw km++ is very slow so a rougher but faster approach is lb_keogh
elif metric == 'lb_keogh':
def metric(X, Y):
n_X = X.shape[0]
n_Y = Y.shape[0]
X_envelope = np.array([envelope(x, bound_reach) for x in X])
# we cannot use scipy cdist here as X_envelope is 3 dimensional (2nd dim is 2: lb & ub)
# use numpy to hopefully get something approaching the speed of cdist
dists = np.empty(shape=(n_X, n_Y))
ii, jj = np.mgrid[:n_X, :n_Y]
for i, j in np.c_[ii.ravel(), jj.ravel()]:
dists[i, j] = lb_keogh_from_bounds(
Y[j], X_envelope[i, 0], X_envelope[i, 1])
return dists
centers = np.empty((n_clusters, n_features), dtype=X.dtype)
n_local_trials = 2 + int(np.log(n_clusters))
# Pick first center randomly
center_id = np.random.randint(n_samples)
centers[0] = X[center_id]
# Initialize list of closest distances and calculate current potential
dists = metric(centers[0, np.newaxis], X)
potential = dists.sum()
# Pick the remaining n_clusters-1 points
for c in range(1, n_clusters):
# Choose center candidates by sampling with probability proportional
# to the squared distance to the closest existing center
rand_vals = np.random.random_sample(n_local_trials) * potential
candidate_ids = np.searchsorted(np.cumsum(dists), rand_vals)
# Compute distances to center candidates
candidate_dists = metric(X[candidate_ids], X)
# Decide which candidate is the best
best_candidate = None
best_potential = None
best_dists = None
for candidate_id in range(n_local_trials):
# Compute potential when including center candidate
new_dists = np.minimum(dists, candidate_dists[candidate_id])
new_potential = new_dists.sum()
# Store result if it is the best local trial so far
if (best_candidate is None) or (new_potential < best_potential):
best_candidate = candidate_ids[candidate_id]
best_potential = new_potential
best_dists = new_dists
# Permanently add best center candidate found in local tries
centers[c] = X[best_candidate]
potential = best_potential
dists = best_dists
return centers
import numpy as np
from scipy.spatial.distance import pdist, squareform
from .dtw import dtw
def medoid(X):
dists = squareform(pdist(X))
dist_sum = dists.sum(0)
m = X[dist_sum.argmin()]
return m
def _update_dba(t, X):
t_update = np.zeros_like(t)
t_count = np.zeros_like(t)
for x in X:
_, path = dtw(t, x, backtrack=True)
for i, j in path:
t_update[i] += x[j]
t_count[i] += 1
t_update /= t_count
return np.asarray(t_update)
def dba(X, it=20, tol=0.05):
t = medoid(X)
for _ in range(it):
t_update = _update_dba(t, X)
if np.allclose(t, t_update, 0.05):
return t_update
t = t_update
return t
import numpy as np
from scipy.spatial.distance import cdist
def accum_cost_and_path_matrix(cost, subseq):
ij = np.array(cost.shape) + 1
accum_cost = np.full(ij, np.inf, dtype=cost.dtype)
accum_cost[0, 0] = 0
if subseq:
accum_cost[0, :] = 0
path_matrix = np.zeros(ij, dtype='i')
ii, jj = np.mgrid[1: ij[0], 1: ij[1]]
for i, j in np.c_[ii.ravel(), jj.ravel()]:
pos_cost = cost[i - 1, j - 1]
steps = [[i - 1, i, i - 1], [j - 1, j - 1, j]]
step_cost = accum_cost[steps]
step_type = np.argmin(step_cost)
path_matrix[i, j] = step_type
accum_cost[i, j] = pos_cost + step_cost[step_type]
return accum_cost[1:, 1:], path_matrix[1:, 1:]
def backtrack_path_matrix(path_matrix, min_idx=None):
idx = np.array(path_matrix.shape) - 1
if min_idx is not None:
idx[1] = min_idx
step_types = np.array([[-1, -1], [0, -1], [-1, 0]])
path = [idx.copy(), ]
while np.all(idx != 0):
idx += step_types[path_matrix[idx[0], idx[1]]]
return np.asarray(path)
def dtw(X, Y,
cost = cdist(X.reshape(-1, 1),
Y.reshape(-1, 1),
accum_cost, path_matrix = accum_cost_and_path_matrix(cost, subseq)
if subseq:
min_idx = np.argmin(accum_cost[-1, :])
min_idx = path_matrix.shape[1] - 1
if not backtrack:
if return_matrix:
return accum_cost
return accum_cost[-1, min_idx]
path = backtrack_path_matrix(path_matrix, min_idx)
if return_matrix:
return accum_cost, path
return accum_cost[-1, min_idx], path
def dtw_dist(X1, X2=None, subseq=False):
def _dtw(x, y):
cost = dtw(x, y,
if X2 is None:
dists = squareform(pdist(X1, metric=_dtw))
dists = cdist(X1, X2, metric=_dtw)
return dists
import numpy as np
from scipy import ndimage as ndi
def envelope(X, bound_reach):
lb = ndi.minimum_filter1d(X, bound_reach, axis=0, mode='reflect')
ub = ndi.maximum_filter1d(X, bound_reach, axis=0, mode='reflect')
return lb, ub
def lb_keogh_from_bounds(Y, lb, ub):
scores =
condlist=[Y < lb, Y > ub],
choicelist=[lb - Y, Y - ub],
return scores.sum()
def lb_keogh(X, Y, bound_reach):
lb, ub = envelope(X, bound_reach)
return lb_keogh_from_bounds(Y, lb, ub)
def lb_improved(X, Y, bound_reach):
lb, ub = envelope(X, bound_reach)
lb_k = lb_keogh_from_bounds(Y, lb, ub)
x_dash = np.clip(Y, lb, ub)
xd_lb, xd_ub = envelope(x_dash, bound_reach)
lb_i = lb_k + lb_keogh_from_bounds(X, xd_lb, xd_ub)
return lb_i
def lb_keogh_cdist(envelope_windows, db):
n_windows = envelope_windows.shape[0]
n_entries = db.shape[0]
results = np.empty(shape=(n_windows, n_entries))
ii, jj = np.mgrid[:n_windows, :n_entries]
for i, j in np.c_[ii.ravel(), jj.ravel()]:
results[i, j] = lb_keogh_from_bounds(
db[j], envelope_windows[i, 0], envelope_windows[i, 1])
return results
import numpy as np
def window_ts(X, size, step_size=1, axis=None):
if axis is None:
axis = X.ndim - 1
n_windows = (X.shape[axis] - size) // step_size + 1
shape = X.shape[:axis] + (n_windows, size, ) + X.shape[axis + 1:]
new_strides = (X.strides[axis] * step_size, X.strides[axis])
strides = (X.strides[:axis] + new_strides + X.strides[axis + 1:])
return np.lib.stride_tricks.as_strided(X, shape=shape, strides=strides)
