Skip to content

Instantly share code, notes, and snippets.

@mparker2
Last active June 27, 2022 13:26
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mparker2/8c6dff1f8d354b1c2fd58d87c351e1fe to your computer and use it in GitHub Desktop.
Save mparker2/8c6dff1f8d354b1c2fd58d87c351e1fe to your computer and use it in GitHub Desktop.
some numpy functions for dynamic time warping
import numpy as np
from scipy.spatial.distance import cdist
from .dtw import dtw
from .lb import envelope, lb_keogh_cdist, lb_keogh_from_bounds
from .window import window_ts
def dtw_nearest_neighbours(query, db, bound_reach, step_size, subseq=False):
assert query.ndim == 1
assert db.ndim == 2
assert query.flags.contiguous
query_lb, query_ub = envelope(query, bound_reach)
query_len = query.shape[0]
db_len = db.shape[1]
if query_len > db_len:
windows = window_ts(query, size=db_len, step_size=step_size)
envelope_windows = window_ts(np.c_[query_lb, query_ub],
size=db_len,
step_size=step_size,
axis=0).transpose(0, 2, 1)
elif query_len == db_len:
windows = query.reshape(1, -1)
envelope_windows = np.expand_dims(np.c_[query_lb, query_ub].T, 0)
else:
raise NotImplementedError()
lb_scores = lb_keogh_cdist(envelope_windows, db).min(0)
best_lb_idx = np.argsort(lb_scores)
best_dtw = np.inf
nearest_neighbour = None
nn_idx = None
for idx in best_lb_idx:
if lb_scores[idx] < best_dtw:
candidate = db[idx]
dtw_score = dtw(candidate, query, subseq=subseq)
if dtw_score < best_dtw:
best_dtw = dtw_score
nearest_neighbour = candidate
nn_idx = idx
return nearest_neighbour, best_dtw, nn_idx
### DTW Kmeans ++ initialisation:
# adapted from https://github.com/scikit-learn/scikit-learn/blob/da71b827b8b56bd8305b7fe6c13724c7b5355209/sklearn/cluster/k_means_.py#L44
def kmeans_plus_plus(X, n_clusters, metric='dtw', bound_reach=5):
n_samples, n_features = X.shape
if metric == 'dtw':
def metric(X, Y):
_metric = partial(dtw, subseq=False, backtrack=False, return_matrix=False)
return cdist(X, Y, metric=_metric)
elif metric == 'dtw_subseq':
def metric(X, Y):
_metric = partial(dtw, subseq=True, backtrack=False, return_matrix=False)
return cdist(X, Y, metric=_metric)
# dtw km++ is very slow so a rougher but faster approach is lb_keogh
elif metric == 'lb_keogh':
def metric(X, Y):
n_X = X.shape[0]
n_Y = Y.shape[0]
X_envelope = np.array([envelope(x, bound_reach) for x in X])
# we cannot use scipy cdist here as X_envelope is 3 dimensional (2nd dim is 2: lb & ub)
# use numpy to hopefully get something approaching the speed of cdist
dists = np.empty(shape=(n_X, n_Y))
ii, jj = np.mgrid[:n_X, :n_Y]
for i, j in np.c_[ii.ravel(), jj.ravel()]:
dists[i, j] = lb_keogh_from_bounds(
Y[j], X_envelope[i, 0], X_envelope[i, 1])
return dists
centers = np.empty((n_clusters, n_features), dtype=X.dtype)
n_local_trials = 2 + int(np.log(n_clusters))
# Pick first center randomly
center_id = np.random.randint(n_samples)
centers[0] = X[center_id]
# Initialize list of closest distances and calculate current potential
dists = metric(centers[0, np.newaxis], X)
potential = dists.sum()
# Pick the remaining n_clusters-1 points
for c in range(1, n_clusters):
# Choose center candidates by sampling with probability proportional
# to the squared distance to the closest existing center
rand_vals = np.random.random_sample(n_local_trials) * potential
candidate_ids = np.searchsorted(np.cumsum(dists), rand_vals)
# Compute distances to center candidates
candidate_dists = metric(X[candidate_ids], X)
# Decide which candidate is the best
best_candidate = None
best_potential = None
best_dists = None
for candidate_id in range(n_local_trials):
# Compute potential when including center candidate
new_dists = np.minimum(dists, candidate_dists[candidate_id])
new_potential = new_dists.sum()
# Store result if it is the best local trial so far
if (best_candidate is None) or (new_potential < best_potential):
best_candidate = candidate_ids[candidate_id]
best_potential = new_potential
best_dists = new_dists
# Permanently add best center candidate found in local tries
centers[c] = X[best_candidate]
potential = best_potential
dists = best_dists
return centers
import numpy as np
from scipy.spatial.distance import pdist, squareform
from .dtw import dtw
def medoid(X):
dists = squareform(pdist(X))
dist_sum = dists.sum(0)
m = X[dist_sum.argmin()]
return m
def _update_dba(t, X):
t_update = np.zeros_like(t)
t_count = np.zeros_like(t)
for x in X:
_, path = dtw(t, x, backtrack=True)
for i, j in path:
t_update[i] += x[j]
t_count[i] += 1
t_update /= t_count
return np.asarray(t_update)
def dba(X, it=20, tol=0.05):
t = medoid(X)
for _ in range(it):
t_update = _update_dba(t, X)
if np.allclose(t, t_update, 0.05):
return t_update
else:
t = t_update
return t
import numpy as np
from scipy.spatial.distance import cdist
def accum_cost_and_path_matrix(cost, subseq):
ij = np.array(cost.shape) + 1
accum_cost = np.full(ij, np.inf, dtype=cost.dtype)
accum_cost[0, 0] = 0
if subseq:
accum_cost[0, :] = 0
path_matrix = np.zeros(ij, dtype='i')
ii, jj = np.mgrid[1: ij[0], 1: ij[1]]
for i, j in np.c_[ii.ravel(), jj.ravel()]:
pos_cost = cost[i - 1, j - 1]
steps = [[i - 1, i, i - 1], [j - 1, j - 1, j]]
step_cost = accum_cost[steps]
step_type = np.argmin(step_cost)
path_matrix[i, j] = step_type
accum_cost[i, j] = pos_cost + step_cost[step_type]
return accum_cost[1:, 1:], path_matrix[1:, 1:]
def backtrack_path_matrix(path_matrix, min_idx=None):
idx = np.array(path_matrix.shape) - 1
if min_idx is not None:
idx[1] = min_idx
step_types = np.array([[-1, -1], [0, -1], [-1, 0]])
path = [idx.copy(), ]
while np.all(idx != 0):
idx += step_types[path_matrix[idx[0], idx[1]]]
path.append(idx.copy())
return np.asarray(path)
def dtw(X, Y,
metric='cityblock',
backtrack=False,
return_matrix=False,
subseq=False):
cost = cdist(X.reshape(-1, 1),
Y.reshape(-1, 1),
metric=metric)
accum_cost, path_matrix = accum_cost_and_path_matrix(cost, subseq)
if subseq:
min_idx = np.argmin(accum_cost[-1, :])
else:
min_idx = path_matrix.shape[1] - 1
if not backtrack:
if return_matrix:
return accum_cost
else:
return accum_cost[-1, min_idx]
else:
path = backtrack_path_matrix(path_matrix, min_idx)
if return_matrix:
return accum_cost, path
else:
return accum_cost[-1, min_idx], path
def dtw_dist(X1, X2=None, subseq=False):
def _dtw(x, y):
cost = dtw(x, y,
backtrack=False,
return_matrix=False,
subseq=subseq)
if X2 is None:
dists = squareform(pdist(X1, metric=_dtw))
else:
dists = cdist(X1, X2, metric=_dtw)
return dists
import numpy as np
from scipy import ndimage as ndi
def envelope(X, bound_reach):
lb = ndi.minimum_filter1d(X, bound_reach, axis=0, mode='reflect')
ub = ndi.maximum_filter1d(X, bound_reach, axis=0, mode='reflect')
return lb, ub
def lb_keogh_from_bounds(Y, lb, ub):
scores = np.select(
condlist=[Y < lb, Y > ub],
choicelist=[lb - Y, Y - ub],
default=0
)
return scores.sum()
def lb_keogh(X, Y, bound_reach):
lb, ub = envelope(X, bound_reach)
return lb_keogh_from_bounds(Y, lb, ub)
def lb_improved(X, Y, bound_reach):
lb, ub = envelope(X, bound_reach)
lb_k = lb_keogh_from_bounds(Y, lb, ub)
x_dash = np.clip(Y, lb, ub)
xd_lb, xd_ub = envelope(x_dash, bound_reach)
lb_i = lb_k + lb_keogh_from_bounds(X, xd_lb, xd_ub)
return lb_i
def lb_keogh_cdist(envelope_windows, db):
n_windows = envelope_windows.shape[0]
n_entries = db.shape[0]
results = np.empty(shape=(n_windows, n_entries))
ii, jj = np.mgrid[:n_windows, :n_entries]
for i, j in np.c_[ii.ravel(), jj.ravel()]:
results[i, j] = lb_keogh_from_bounds(
db[j], envelope_windows[i, 0], envelope_windows[i, 1])
return results
import numpy as np
def window_ts(X, size, step_size=1, axis=None):
if axis is None:
axis = X.ndim - 1
n_windows = (X.shape[axis] - size) // step_size + 1
shape = X.shape[:axis] + (n_windows, size, ) + X.shape[axis + 1:]
new_strides = (X.strides[axis] * step_size, X.strides[axis])
strides = (X.strides[:axis] + new_strides + X.strides[axis + 1:])
return np.lib.stride_tricks.as_strided(X, shape=shape, strides=strides)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment