Skip to content

Instantly share code, notes, and snippets.

@nkt1546789
Last active March 19, 2017 06:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nkt1546789/b24ecd9546479e13355a0377ffb6b286 to your computer and use it in GitHub Desktop.
Save nkt1546789/b24ecd9546479e13355a0377ffb6b286 to your computer and use it in GitHub Desktop.
A demo of Burst detection on artificial data.
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
def generate_samples(n, n_states, p, lam):
A = generate_transition_matrix(n_states, p)
x = []
y = []
state = 0
for rs in range(1, n+1):
state = stats.multinomial.rvs(1, A[state], size=1, random_state=rs).argmax()
x_i = stats.expon.rvs(0, 1.0 / lam[state], random_state=rs)
x.append(x_i)
y.append(state)
x = np.array(x)
y = np.array(y)
return x, y
def generate_transition_matrix(n_states, p):
A = np.diag([1-p] + [1-2*p for _ in range(n_states-2)] + [1-p])
tmp1 = np.arange(1, n_states)
tmp2 = np.arange(0, n_states-1)
idx1 = np.concatenate([tmp1, tmp2])
idx2 = np.concatenate([tmp2, tmp1])
A[idx1, idx2] = p
return A
def estimate_labels(x, classes=None, transition_proba=0.001, n_states=2, max_iter=100, stopping_criterion=1e-4):
lam = estimate(x, n_states=n_states, max_iter=max_iter, stopping_criterion=stopping_criterion)
print(lam)
if classes is None:
tmp = np.arange(n_states)
classes = tmp[lam.argsort()]
print(classes)
P = np.log(lam) - lam * np.c_[x]
A = np.ones((n_states, n_states)) * transition_proba
A[np.arange(n_states), np.arange(n_states)] = 1.0 - transition_proba * (n_states-1)
A = np.log(A)
print(A)
y = viterbi(P, A)
return classes[y]
def estimate(x, n_states=2, max_iter=100, stopping_criterion=1e-4):
r = np.random.RandomState(1)
lam = np.abs(r.normal(n_states, size=n_states))
r = np.random.RandomState(2)
alpha = np.abs(r.normal(n_states, size=n_states))
for iteration in range(max_iter):
lam_prev = lam.copy()
alpha_prev = alpha.copy()
tmp = alpha * lam * np.exp(- np.c_[x] * lam)
Q = tmp / np.c_[tmp.sum(axis=1)]
lam = Q.sum(axis=0) / np.sum(Q * np.c_[x], axis=0)
alpha = Q.sum(axis=0) / Q.sum()
if np.sum((alpha - alpha_prev)**2) < stopping_criterion and np.sum((lam - lam_prev)**2) < stopping_criterion:
print("iteration:", iteration)
break
return lam
def viterbi(P, A):
"""
P: log probability matrix (n_samples by n_states)
A: log transition probability matrix (n_states by n_states)
"""
n_samples = P.shape[0]
n_states = P.shape[1]
states = np.arange(n_states)
V = np.zeros((n_samples, n_states))
S = np.zeros((n_samples, n_states), dtype=int)
V[0] = P[0]
for t in range(1, n_samples):
values = V[t-1] + A
S[t-1] = np.argmax(values, axis=1)
V[t] = P[t] + np.max(values, axis=1)
y = np.zeros(n_samples, dtype=int)
y[-1] = V[-1].argmax()
for t in range(2, n_samples+1):
y[-t] = S[-t, y[-t+1]]
return y
def plot_data(x, y, window=300.0, title=""):
cumulative = 0
bins_states = []
bins = []
n_states = len(np.unique(y))
state_count = np.zeros(n_states, dtype=int)
count = 0
for s_t, x_t in zip(y, x):
if cumulative >= window:
bins_states.append(state_count.argmax())
state_count = np.zeros(n_states, dtype=int)
bins.append(count)
count = 0
cumulative = 0
count += 1
cumulative += x_t
state_count[s_t] += 1
bins = np.array(bins)
z = bins / bins.max()
bins_states = np.array(bins_states)
z_bins = bins_states / bins_states.max()
print(z_bins)
plt.plot(np.arange(len(z)), z, "b-")
plt.plot(np.arange(len(z_bins)), z_bins, "r-")
plt.title(title)
plt.ylim(0, 1.1)
if __name__ == '__main__':
n = 100000
p = 0.0001
n_states = 3
lam = np.array([5, 50, 100])
x, y = generate_samples(n, n_states, p, lam)
ypred = estimate_labels(x, n_states=2, transition_proba=p)
plt.figure()
plt.subplot(2, 1, 1)
plot_data(x, y, title="true")
plt.subplot(2, 1, 2)
plot_data(x, ypred, title="estimated")
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment