Created
March 14, 2016 23:08
-
-
Save anonymous/38e0bd9d41f95b5c3b43 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# cython: boundscheck=False | |
# cython: cdivision=True | |
# cython: nonecheck=False | |
# cython: wraparound=False | |
from __future__ import division | |
cdef extern from "float.h": | |
double DBL_MAX | |
cdef extern from "math.h": | |
double sqrt(double) | |
import numpy as np | |
cdef int type1 = 1 | |
cdef int type2 = 2 | |
cdef int type3 = 3 | |
cdef inline double euclidean(double x, double y): | |
return sqrt((x - y) * (x - y)) | |
cdef inline double min3(double[3] v): | |
cdef int i, m = 0 | |
for i in range(1, 3): | |
if v[i] < v[m]: | |
m = i | |
return v[m] | |
def dtw(double[:] x, double[:] y): | |
"""dtw(x, y) | |
Dynamic time warping (DTW) is an algorithm for measuring similarity | |
between two sequences which may vary in time or speed | |
For instance, similarities in walking patterns would be detected, | |
even if in one video the person was walking slowly and if in another he or | |
she were walking more quickly, or even if there were accelerations | |
and decelerations during the course of one observation | |
Parameters | |
---------- | |
x : (M,) ndarray, float64 | |
First input sequence. | |
y : (N,) ndarray, float64 | |
Second input sequence. | |
Returns | |
------- | |
path : list | |
Coordinates of least cost path found in the distance matrix between x | |
and y. | |
costs : (M, N) ndarray | |
Cumulative cost matrix. | |
min_cost : float | |
Total cost along least cost path. | |
References | |
---------- | |
.. [1] http://mirlab.org/jang/books/dcpr/dpDtw.asp?title=8-4%20Dynamic%20Time%20Warping | |
.. [2] http://en.wikipedia.org/wiki/Dynamic_time_warping | |
""" | |
cdef: | |
int m = len(x) | |
int n = len(y) | |
double[:, ::1] distance | |
Py_ssize_t i, j, min_i, min_j | |
double[3] costs | |
if len(x) > 2 * len(y) or len(y) > 2 * len(x): | |
raise ValueError("Sequence lengths cannot differ by more than 50%.") | |
distance = np.zeros((m + 1, n + 1)) + DBL_MAX | |
distance[0, 0] = 0 | |
# Step forward | |
for i in range(1, m + 1): | |
for j in range(1, n + 1): | |
costs[0] = distance[i - 1, j - 1] | |
costs[1] = distance[i - 1, j] | |
costs[2] = distance[i, j - 1] | |
distance[i, j] = euclidean(x[i - 1], y[j - 1]) + min3(costs) | |
# Trace back | |
cdef list path = [] | |
i = m | |
j = n | |
while not ((i == 1) and (j == 1)): | |
path.append((i - 1, j - 1)) | |
min_i, min_j = i - 1, j - 1 | |
if distance[i, j - 1] < distance[min_i, min_j]: | |
min_i, min_j = i, j - 1 | |
if distance[i - 1, j] < distance[min_i, min_j]: | |
min_i, min_j = i - 1, j | |
i, j = min_i, min_j | |
path.append((0, 0)) | |
r0, c0 = path[0] | |
return path[::-1], np.asarray(distance[1:, 1:]), distance[r0, c0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment