Skip to content

Instantly share code, notes, and snippets.

Created March 14, 2016 23:08
Show Gist options
  • Save anonymous/38e0bd9d41f95b5c3b43 to your computer and use it in GitHub Desktop.
Save anonymous/38e0bd9d41f95b5c3b43 to your computer and use it in GitHub Desktop.
# cython: boundscheck=False
# cython: cdivision=True
# cython: nonecheck=False
# cython: wraparound=False
from __future__ import division
cdef extern from "float.h":
double DBL_MAX
cdef extern from "math.h":
double sqrt(double)
import numpy as np
cdef int type1 = 1
cdef int type2 = 2
cdef int type3 = 3
cdef inline double euclidean(double x, double y):
return sqrt((x - y) * (x - y))
cdef inline double min3(double[3] v):
cdef int i, m = 0
for i in range(1, 3):
if v[i] < v[m]:
m = i
return v[m]
def dtw(double[:] x, double[:] y):
"""dtw(x, y)
Dynamic time warping (DTW) is an algorithm for measuring similarity
between two sequences which may vary in time or speed
For instance, similarities in walking patterns would be detected,
even if in one video the person was walking slowly and if in another he or
she were walking more quickly, or even if there were accelerations
and decelerations during the course of one observation
Parameters
----------
x : (M,) ndarray, float64
First input sequence.
y : (N,) ndarray, float64
Second input sequence.
Returns
-------
path : list
Coordinates of least cost path found in the distance matrix between x
and y.
costs : (M, N) ndarray
Cumulative cost matrix.
min_cost : float
Total cost along least cost path.
References
----------
.. [1] http://mirlab.org/jang/books/dcpr/dpDtw.asp?title=8-4%20Dynamic%20Time%20Warping
.. [2] http://en.wikipedia.org/wiki/Dynamic_time_warping
"""
cdef:
int m = len(x)
int n = len(y)
double[:, ::1] distance
Py_ssize_t i, j, min_i, min_j
double[3] costs
if len(x) > 2 * len(y) or len(y) > 2 * len(x):
raise ValueError("Sequence lengths cannot differ by more than 50%.")
distance = np.zeros((m + 1, n + 1)) + DBL_MAX
distance[0, 0] = 0
# Step forward
for i in range(1, m + 1):
for j in range(1, n + 1):
costs[0] = distance[i - 1, j - 1]
costs[1] = distance[i - 1, j]
costs[2] = distance[i, j - 1]
distance[i, j] = euclidean(x[i - 1], y[j - 1]) + min3(costs)
# Trace back
cdef list path = []
i = m
j = n
while not ((i == 1) and (j == 1)):
path.append((i - 1, j - 1))
min_i, min_j = i - 1, j - 1
if distance[i, j - 1] < distance[min_i, min_j]:
min_i, min_j = i, j - 1
if distance[i - 1, j] < distance[min_i, min_j]:
min_i, min_j = i - 1, j
i, j = min_i, min_j
path.append((0, 0))
r0, c0 = path[0]
return path[::-1], np.asarray(distance[1:, 1:]), distance[r0, c0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment