Skip to content

Instantly share code, notes, and snippets.

@DavideCanton
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DavideCanton/f55f35912367146dd400 to your computer and use it in GitHub Desktop.
Save DavideCanton/f55f35912367146dd400 to your computer and use it in GitHub Desktop.
Segmentation of multivariate time series
__author__ = 'davide'
import matplotlib.pyplot as plt
from collections import defaultdict
import numpy as np
from datetime import datetime, timedelta
import pandas as pd
from statsmodels.tsa.vector_ar.var_model import VAR
def compute_coeffs(series, p):
data = pd.DataFrame(series)
index = [datetime.now() + timedelta(days=i) for i in range(data.shape[0])]
data.index = pd.DatetimeIndex(index)
model = VAR(data)
res = model.fit(p)
return res.intercept, res.coefs
def compute_errors(tsa, p, Nmax, min_len=10):
# serie e' una matrice T x k, ogni riga e' un valore
tsa = tsa.T
k, T = tsa.shape
d = defaultdict(lambda: float("inf"))
for s in range(T - min_len + 1):
for t in range(s + min_len - 1, T):
try:
Z = tsa[:, s:t + 1] # t incluso
intercept, phi = compute_coeffs(Z.T, p)
if p:
phi = np.hstack(phi)
Y = np.zeros((k * p, t - s - p + 1))
ind = s + p - 1
Y[:k, :] = tsa[:, ind:t]
for i in range(2, p + 1):
ind -= 1
l, h = (i - 1) * k, i * k
Y[l:h, :] = np.roll(Y[l - k:h - k, :], 1, axis=1)
Y[l:h, 0] = tsa[:, ind]
Z2 = intercept[:, np.newaxis] + np.dot(phi, Y)
d[s + 1, t + 1] = ((Z[:, p:] - Z2) ** 2).sum()
else:
Z2 = intercept[:, np.newaxis]
d[s + 1, t + 1] = ((Z - Z2) ** 2).sum()
except ValueError:
# print(s, "-", t)
pass
return d
def dp_seg(d, T, Nmax):
# initialization
c = {(0, 1): 0}
z = {}
for t in range(1, T + 1):
c[1, t] = d[1, t]
z[1, t] = 1
# minimization
for N in range(2, Nmax + 1):
e = np.ones((T + 1, T + 1)) * np.inf
c[N, 0] = 0
for s in range(1, T + 1):
for t in range(1, s + 1):
e[t - 1, s - 1] = c[N - 1, t] + d[t + 1, s]
ind_min = e[:s, s - 1].argmin()
z[N, s] = ind_min + 1
c[N, s] = e[ind_min, s - 1]
t = {}
# backtracking
for N in range(1, Nmax + 1):
t[N, N] = T
for n in range(N, 0, -1):
t[N, n - 1] = z[n, t[N, n]]
return t, c
def gen_series():
mean = np.array([0., 0.])
p0 = np.array([0., 0.])
p1 = np.array([[0.6, 0], [0.3, 0.6]])
s1 = np.array([[1, 0.2], [0.2, 1]])
v1 = np.array([0., 0.])
res = [v1]
for _ in range(140):
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1)
res.append(v2)
v1 = v2
p1 = np.array([[-0.6, 0], [-0.3, -0.6]])
s1 = np.array([[1, -0.3], [-0.3, 1]])
for _ in range(61):
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1)
res.append(v2)
v1 = v2
return np.array(res[-100:])
def gen_series2():
mean = np.array([0., 0.])
p0 = np.array([0., 0.])
p1 = np.array([[-0.9, 0], [0.2, -0.9]])
s1 = np.eye(2)
v1 = np.array([0., 0.])
res = [v1]
for _ in range(160):
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1)
res.append(v2)
v1 = v2
p1 = np.array([[0.6, 0], [0, 0.6]])
for _ in range(81):
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1)
res.append(v2)
v1 = v2
p1 = np.array([[-0.8, 0], [0.2, 0.8]])
for _ in range(61):
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1)
res.append(v2)
v1 = v2
return np.array(res[-200:])
def BIC(j_n, T, N, p, k):
return (np.log(j_n / (T - p - 1)) + np.log(T - p) / (T - p) *
N * k * (k * p + 1))
def format_res(t, c, T, k, p):
seg = defaultdict(list)
for (N, n), v in t.items():
seg[N].append(v)
costs = [(x, v) for (x, y), v in c.items() if y == T]
costs.sort()
res = {}
for (N, j_n) in costs:
if N > 1:
res[N] = sorted(seg[N]), BIC(j_n, T, N, p, k)
return res
def plot_res(k, N, p, v, seg):
title = "{}-segmentation (p={}), BIC = {}".format(N, p, v)
f, ax = plt.subplots(k, sharex=True)
plt.suptitle(title)
for i in range(k):
ax[i].plot(range(1, T + 1), serie[:, i])
for s in seg:
for i in range(k):
ax[i].plot([s, s], [m[i], M[i]], "r-.", linewidth=2)
plt.show()
if __name__ == "__main__":
serie = gen_series()
T, k = serie.shape
Nmax = 3
m = serie.min(axis=0)
M = serie.max(axis=0)
cur_min = float("inf")
best = None
best_seg = None
for p in range(3):
print("p =", p)
print("Computing errors...")
d = compute_errors(serie, p, Nmax)
print("Computing segmentation...")
t, c = dp_seg(d, T, Nmax)
res = format_res(t, c, T, k, p)
for N, (seg, v) in res.items():
if v < cur_min:
cur_min = v
best = (N, p)
best_seg = seg
print("N =", N, "-", seg, v)
print("End!")
print("=" * 100)
print("Best segmentation, according to BIC, is:", best, "with value =",
cur_min)
print("Boundaries:", best_seg)
plot_res(k, best[0], best[1], cur_min, best_seg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment