Skip to content

Instantly share code, notes, and snippets.

@giuseppebonaccorso
Created August 28, 2017 09:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save giuseppebonaccorso/8102b09ab4de87d6c540d5fe058ad3cf to your computer and use it in GitHub Desktop.
Save giuseppebonaccorso/8102b09ab4de87d6c540d5fe058ad3cf to your computer and use it in GitHub Desktop.
Fixed-delay smoothing in HMM
import numpy as np
from Queue import Queue
class HMM:
def __init__(self, transition_matrix, observation_matrix, delay, initial_state):
self.TM = transition_matrix
self.OM = observation_matrix
self.delay = delay
self.initial_state = np.array(initial_state)
self.events = Queue()
self.t = 0
self.B = np.ndarray(self.TM.shape)
self.forward = np.dot(self.initial_state, self.TM)
self.backward = np.dot(self.initial_state, self.TM)
def predict_state(self, event):
self.events.put_nowait(event)
# Compute diagonal matrix with current event
Ot = self.compute_O_matrix(event)
# Perform fixed-delay smoothing
if self.t > self.delay:
self.forward = self.compute_forward(Ot)
etp = self.events.get_nowait()
Otp = self.compute_O_matrix(etp)
self.B = reduce(np.dot, [np.linalg.inv(Otp), np.linalg.inv(self.TM), self.B, self.TM, Ot])
else:
self.B = reduce(np.dot, [self.B, self.TM, Ot])
self.t += 1
if self.t > self.delay:
return self.normalize(np.dot(self.forward, self.B))
else:
return None
def reset(self):
self.events = Queue()
self.t = 0
self.B = np.ndarray(self.TM.shape)
self.forward = np.dot(self.initial_state, self.TM)
self.backward = np.dot(self.initial_state, self.TM)
def compute_O_matrix(self, event):
return np.diagflat(self.OM[event])
def compute_forward(self, Ot):
return reduce(np.dot, [Ot, self.TM.T, self.forward.T])
def compute_backward(self, Ot):
return reduce(np.dot, [self.TM, Ot, self.backward.T])
@staticmethod
def normalize(x):
return x / np.sum(x)
def log_example(steps=50):
print('Log example')
# Transition probability matrix
# T(i,j) = P(Xt=j | Xt-1=i)
T = np.array([[0.6, 0.3, 0.1],
[0.4, 0.4, 0.2],
[0.1, 0.4, 0.5]])
T_labels = ('Ok', 'Some issues', 'Out of order')
# Observation probability matrix
# O(i,j) = P(event | Xt=i)
O = np.array([[0.8, 0.15, 0.05],
[0.15, 0.7, 0.1],
[0.05, 0.2, 0.75]])
O_labels = ('Green', 'Yellow', 'Red')
hmm = HMM(transition_matrix=T, observation_matrix=O, delay=1, initial_state=[0.5, 0.5, 0.5])
# Prediction
for i in range(1, steps):
observation = np.random.randint(0, len(O_labels))
prediction = hmm.predict_state(observation)
max_value = int(np.argmax(prediction))
if prediction is not None:
print(
'O%d: %s -> %s (%1.3f%%)' % (i, O_labels[observation], T_labels[max_value], prediction[max_value] * 100))
if __name__ == '__main__':
print('HMM simulation')
#rain_example()
log_example()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment