Last active
August 29, 2015 14:06
-
-
Save DavideCanton/f55f35912367146dd400 to your computer and use it in GitHub Desktop.
Segmentation of multivariate time series
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = 'davide' | |
import matplotlib.pyplot as plt | |
from collections import defaultdict | |
import numpy as np | |
from datetime import datetime, timedelta | |
import pandas as pd | |
from statsmodels.tsa.vector_ar.var_model import VAR | |
def compute_coeffs(series, p): | |
data = pd.DataFrame(series) | |
index = [datetime.now() + timedelta(days=i) for i in range(data.shape[0])] | |
data.index = pd.DatetimeIndex(index) | |
model = VAR(data) | |
res = model.fit(p) | |
return res.intercept, res.coefs | |
def compute_errors(tsa, p, Nmax, min_len=10): | |
# serie e' una matrice T x k, ogni riga e' un valore | |
tsa = tsa.T | |
k, T = tsa.shape | |
d = defaultdict(lambda: float("inf")) | |
for s in range(T - min_len + 1): | |
for t in range(s + min_len - 1, T): | |
try: | |
Z = tsa[:, s:t + 1] # t incluso | |
intercept, phi = compute_coeffs(Z.T, p) | |
if p: | |
phi = np.hstack(phi) | |
Y = np.zeros((k * p, t - s - p + 1)) | |
ind = s + p - 1 | |
Y[:k, :] = tsa[:, ind:t] | |
for i in range(2, p + 1): | |
ind -= 1 | |
l, h = (i - 1) * k, i * k | |
Y[l:h, :] = np.roll(Y[l - k:h - k, :], 1, axis=1) | |
Y[l:h, 0] = tsa[:, ind] | |
Z2 = intercept[:, np.newaxis] + np.dot(phi, Y) | |
d[s + 1, t + 1] = ((Z[:, p:] - Z2) ** 2).sum() | |
else: | |
Z2 = intercept[:, np.newaxis] | |
d[s + 1, t + 1] = ((Z - Z2) ** 2).sum() | |
except ValueError: | |
# print(s, "-", t) | |
pass | |
return d | |
def dp_seg(d, T, Nmax): | |
# initialization | |
c = {(0, 1): 0} | |
z = {} | |
for t in range(1, T + 1): | |
c[1, t] = d[1, t] | |
z[1, t] = 1 | |
# minimization | |
for N in range(2, Nmax + 1): | |
e = np.ones((T + 1, T + 1)) * np.inf | |
c[N, 0] = 0 | |
for s in range(1, T + 1): | |
for t in range(1, s + 1): | |
e[t - 1, s - 1] = c[N - 1, t] + d[t + 1, s] | |
ind_min = e[:s, s - 1].argmin() | |
z[N, s] = ind_min + 1 | |
c[N, s] = e[ind_min, s - 1] | |
t = {} | |
# backtracking | |
for N in range(1, Nmax + 1): | |
t[N, N] = T | |
for n in range(N, 0, -1): | |
t[N, n - 1] = z[n, t[N, n]] | |
return t, c | |
def gen_series(): | |
mean = np.array([0., 0.]) | |
p0 = np.array([0., 0.]) | |
p1 = np.array([[0.6, 0], [0.3, 0.6]]) | |
s1 = np.array([[1, 0.2], [0.2, 1]]) | |
v1 = np.array([0., 0.]) | |
res = [v1] | |
for _ in range(140): | |
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1) | |
res.append(v2) | |
v1 = v2 | |
p1 = np.array([[-0.6, 0], [-0.3, -0.6]]) | |
s1 = np.array([[1, -0.3], [-0.3, 1]]) | |
for _ in range(61): | |
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1) | |
res.append(v2) | |
v1 = v2 | |
return np.array(res[-100:]) | |
def gen_series2(): | |
mean = np.array([0., 0.]) | |
p0 = np.array([0., 0.]) | |
p1 = np.array([[-0.9, 0], [0.2, -0.9]]) | |
s1 = np.eye(2) | |
v1 = np.array([0., 0.]) | |
res = [v1] | |
for _ in range(160): | |
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1) | |
res.append(v2) | |
v1 = v2 | |
p1 = np.array([[0.6, 0], [0, 0.6]]) | |
for _ in range(81): | |
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1) | |
res.append(v2) | |
v1 = v2 | |
p1 = np.array([[-0.8, 0], [0.2, 0.8]]) | |
for _ in range(61): | |
v2 = p0 + np.dot(p1, v1) + np.random.multivariate_normal(mean, s1) | |
res.append(v2) | |
v1 = v2 | |
return np.array(res[-200:]) | |
def BIC(j_n, T, N, p, k): | |
return (np.log(j_n / (T - p - 1)) + np.log(T - p) / (T - p) * | |
N * k * (k * p + 1)) | |
def format_res(t, c, T, k, p): | |
seg = defaultdict(list) | |
for (N, n), v in t.items(): | |
seg[N].append(v) | |
costs = [(x, v) for (x, y), v in c.items() if y == T] | |
costs.sort() | |
res = {} | |
for (N, j_n) in costs: | |
if N > 1: | |
res[N] = sorted(seg[N]), BIC(j_n, T, N, p, k) | |
return res | |
def plot_res(k, N, p, v, seg): | |
title = "{}-segmentation (p={}), BIC = {}".format(N, p, v) | |
f, ax = plt.subplots(k, sharex=True) | |
plt.suptitle(title) | |
for i in range(k): | |
ax[i].plot(range(1, T + 1), serie[:, i]) | |
for s in seg: | |
for i in range(k): | |
ax[i].plot([s, s], [m[i], M[i]], "r-.", linewidth=2) | |
plt.show() | |
if __name__ == "__main__": | |
serie = gen_series() | |
T, k = serie.shape | |
Nmax = 3 | |
m = serie.min(axis=0) | |
M = serie.max(axis=0) | |
cur_min = float("inf") | |
best = None | |
best_seg = None | |
for p in range(3): | |
print("p =", p) | |
print("Computing errors...") | |
d = compute_errors(serie, p, Nmax) | |
print("Computing segmentation...") | |
t, c = dp_seg(d, T, Nmax) | |
res = format_res(t, c, T, k, p) | |
for N, (seg, v) in res.items(): | |
if v < cur_min: | |
cur_min = v | |
best = (N, p) | |
best_seg = seg | |
print("N =", N, "-", seg, v) | |
print("End!") | |
print("=" * 100) | |
print("Best segmentation, according to BIC, is:", best, "with value =", | |
cur_min) | |
print("Boundaries:", best_seg) | |
plot_res(k, best[0], best[1], cur_min, best_seg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment