Created August 17, 2021 16:53
Simple and extremely fast implementation of rescaled Forward Backward algorithm for Hidden Markov Chains with Independent Gaussian Noise in Python calling C using ctypes and creating a shared library. To compile just type 'make all' at the root of the directory
import numpy as np
from scipy.stats import norm
from ctypes import CDLL, c_int, c_double, c_char, c_void_p
def generate_observations(H, means, stds):
X = ((H == 0) * (means[0] + np.random.randn(*H.shape) * stds[0]) +
(H == 1) * (means[1] + np.random.randn(*H.shape) * stds[1]))
return X
def generate_hidden_states(T, A, p0):
H = []
H.append(np.random.choice(2, 1, p=p0)[0])
for t in range(1, T):
u = np.random.rand()
p = A[H[t - 1]]
H.append(np.nonzero(u < np.cumsum(p))[0][0])
H = np.array(H)
return H
def forward_C_wrapper(T, X_pdf, A):
libC_f = CDLL('./')
nb_states = 2
def forward_C(T, X_pdf, A, libC_f, nb_states):
libC_f._hmcin_forward.argtypes = [
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
libC_f._hmcin_forward.restype = c_void_p
alpha = np.zeros((T, nb_states)).astype(np.float64)
libC_f._hmcin_forward(alpha, T, nb_states, X_pdf, A),
alpha = np.ctypeslib.as_array(alpha)
return alpha
return forward_C(T, X_pdf, A, libC_f, nb_states)
def backward_C_wrapper(T, X_pdf, A):
libC_f = CDLL('./')
nb_states = 2
def backward_C(T, X_pdf, A, libC_f, nb_states):
libC_f._hmcin_backward.argtypes = [
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
libC_f._hmcin_backward.restype = c_void_p
beta = np.zeros((T, nb_states)).astype(np.float64)
libC_f._hmcin_backward(beta, T, nb_states, X_pdf, A),
beta = np.ctypeslib.as_array(beta)
return beta
return backward_C(T, X_pdf, A, libC_f, nb_states)
def get_post_marginals_probas(alpha, beta):
post_marginals_probas = alpha * beta
post_marginals_probas /= np.sum(post_marginals_probas, axis=1, keepdims=True)
return post_marginals_probas
if __name__ == "__main__":
p0 = np.array([0.5, 0.5])
A = np.array([[0.9, 0.1], [0.1, 0.9]])
T = 500
means = np.array([0., 1.])
stds = np.array([0.5, 0.5])
H = generate_hidden_states(T, A, p0)
X = generate_observations(H, means, stds)
X_pdf = np.stack([norm.pdf(X, means[0], stds[0]),
norm.pdf(X, means[1], stds[1])], axis=0)
alpha = forward_C_wrapper(T, X_pdf, A)
beta = backward_C_wrapper(T, X_pdf, A)
post_marginals_probas = get_post_marginals_probas(alpha, beta)
# Marginal MAP criterion
mpm_seg = np.argmax(post_marginals_probas, axis=1)
error_rate = np.count_nonzero(mpm_seg != H) / len(H)
print("Error rate", error_rate)
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <float.h>
#include "hmcin_backward.h"
void _hmcin_backward(double* beta, int T, int nb_states, double* X_pdf,
double* A){
beta will store the result
memset(beta, 0, sizeof(double) * T * nb_states);
/* State 0 */
for (int h = 0; h < nb_states; h++) {
beta[(T - 1) * nb_states + h] = 1;
/* Other states */
double sum;
for (int t = T - 2; t >= 0; t--) {
sum = 0;
for (int h_1 = 0; h_1 < nb_states; h_1++) {
for (int h = 0; h < nb_states; h++) {
beta[t * nb_states + h_1] += A[h_1 * nb_states + h] *
beta[(t + 1) * nb_states + h] * X_pdf[h * T + (t + 1)];
sum += beta[t * nb_states + h_1];
for (int h = 0; h < nb_states; h++) {
beta[t * nb_states + h] /= sum;
void _hmcin_backward(double*, int, int, double*, double*);
#endif /* !HMCIN_BACKWARD_H_ */
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <float.h>
#include "hmcin_forward.h"
void _hmcin_forward(double* alpha, int T, int nb_states, double* X_pdf,
double* A){
alpha will store the result
memset(alpha, 0, sizeof(double) * T * nb_states);
/* State 0 */
for (int h = 0; h < nb_states; h++) {
/* Equiprobable initial probability */
alpha[0 * nb_states + h] = 0.5 * X_pdf[h * T + 0];
/* Other states */
double sum;
for (int t = 1; t < T; t++) {
sum = 0;
for (int h = 0; h < nb_states; h++) {
for (int h_1 = 0; h_1 < nb_states; h_1++) {
alpha[t * nb_states + h] += A[h_1 * nb_states + h] *
alpha[(t - 1) * nb_states + h_1];
alpha[t * nb_states + h] *= X_pdf[h * T + t];
sum += alpha[t * nb_states + h];
for (int h = 0; h < nb_states; h++) {
alpha[t * nb_states + h] /= sum;
void _hmcin_forward(double*, int, int, double*, double*);
#endif /* !HMCIN_FORWARD_H_ */
#This makefile is here to deal with the shared librairy in C
#Algorithms made for efficiency
RM = rm -f
SRC = $(SRC_DIR)/hmcin_forward.c \
$(SRC_DIR)/hmcin_backward.c \
OBJ_HMCIN = hmcin_forward.o \
all: lib clean
gcc -fpic -c -std=c99 $(SRC)
gcc -lm -shared -o $(OUTPUT_HMCIN) $(OBJ_HMCIN)
fclean: clean
re: fclean all
