Skip to content

Instantly share code, notes, and snippets.

@HGangloff
Created August 17, 2021 16:53
Show Gist options
  • Save HGangloff/0f6c82c48b460b7430a3dadca491b2dd to your computer and use it in GitHub Desktop.
Save HGangloff/0f6c82c48b460b7430a3dadca491b2dd to your computer and use it in GitHub Desktop.
Simple and extremely fast implementation of rescaled Forward Backward algorithm for Hidden Markov Chains with Independent Gaussian Noise in Python calling C using ctypes and creating a shared library. To compile just type 'make all' at the root of the directory
import numpy as np
from scipy.stats import norm
from ctypes import CDLL, c_int, c_double, c_char, c_void_p
def generate_observations(H, means, stds):
X = ((H == 0) * (means[0] + np.random.randn(*H.shape) * stds[0]) +
(H == 1) * (means[1] + np.random.randn(*H.shape) * stds[1]))
return X
def generate_hidden_states(T, A, p0):
H = []
H.append(np.random.choice(2, 1, p=p0)[0])
for t in range(1, T):
u = np.random.rand()
p = A[H[t - 1]]
H.append(np.nonzero(u < np.cumsum(p))[0][0])
H = np.array(H)
return H
def forward_C_wrapper(T, X_pdf, A):
libC_f = CDLL('./hmcin_functions.so')
nb_states = 2
def forward_C(T, X_pdf, A, libC_f, nb_states):
libC_f._hmcin_forward.argtypes = [
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
flags='C_CONTIGUOUS'),
c_int,
c_int,
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
flags='C_CONTIGUOUS'),
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
flags='C_CONTIGUOUS')]
libC_f._hmcin_forward.restype = c_void_p
alpha = np.zeros((T, nb_states)).astype(np.float64)
libC_f._hmcin_forward(alpha, T, nb_states, X_pdf, A),
alpha = np.ctypeslib.as_array(alpha)
return alpha
return forward_C(T, X_pdf, A, libC_f, nb_states)
def backward_C_wrapper(T, X_pdf, A):
libC_f = CDLL('./hmcin_functions.so')
nb_states = 2
def backward_C(T, X_pdf, A, libC_f, nb_states):
libC_f._hmcin_backward.argtypes = [
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
flags='C_CONTIGUOUS'),
c_int,
c_int,
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
flags='C_CONTIGUOUS'),
np.ctypeslib.ndpointer(dtype=np.float64, ndim=2,
flags='C_CONTIGUOUS')]
libC_f._hmcin_backward.restype = c_void_p
beta = np.zeros((T, nb_states)).astype(np.float64)
libC_f._hmcin_backward(beta, T, nb_states, X_pdf, A),
beta = np.ctypeslib.as_array(beta)
return beta
return backward_C(T, X_pdf, A, libC_f, nb_states)
def get_post_marginals_probas(alpha, beta):
post_marginals_probas = alpha * beta
post_marginals_probas /= np.sum(post_marginals_probas, axis=1, keepdims=True)
return post_marginals_probas
if __name__ == "__main__":
p0 = np.array([0.5, 0.5])
A = np.array([[0.9, 0.1], [0.1, 0.9]])
T = 500
means = np.array([0., 1.])
stds = np.array([0.5, 0.5])
H = generate_hidden_states(T, A, p0)
X = generate_observations(H, means, stds)
X_pdf = np.stack([norm.pdf(X, means[0], stds[0]),
norm.pdf(X, means[1], stds[1])], axis=0)
alpha = forward_C_wrapper(T, X_pdf, A)
beta = backward_C_wrapper(T, X_pdf, A)
post_marginals_probas = get_post_marginals_probas(alpha, beta)
# Marginal MAP criterion
mpm_seg = np.argmax(post_marginals_probas, axis=1)
error_rate = np.count_nonzero(mpm_seg != H) / len(H)
print("Error rate", error_rate)
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <float.h>
#include "hmcin_backward.h"
void _hmcin_backward(double* beta, int T, int nb_states, double* X_pdf,
double* A){
/*
beta will store the result
*/
memset(beta, 0, sizeof(double) * T * nb_states);
/* State 0 */
for (int h = 0; h < nb_states; h++) {
beta[(T - 1) * nb_states + h] = 1;
}
/* Other states */
double sum;
for (int t = T - 2; t >= 0; t--) {
sum = 0;
for (int h_1 = 0; h_1 < nb_states; h_1++) {
for (int h = 0; h < nb_states; h++) {
beta[t * nb_states + h_1] += A[h_1 * nb_states + h] *
beta[(t + 1) * nb_states + h] * X_pdf[h * T + (t + 1)];
}
sum += beta[t * nb_states + h_1];
}
for (int h = 0; h < nb_states; h++) {
beta[t * nb_states + h] /= sum;
}
}
}
#ifndef HMCIN_BACKWARD_H_
# define HMCIN_BACKWARD_H_
void _hmcin_backward(double*, int, int, double*, double*);
#endif /* !HMCIN_BACKWARD_H_ */
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <float.h>
#include "hmcin_forward.h"
void _hmcin_forward(double* alpha, int T, int nb_states, double* X_pdf,
double* A){
/*
alpha will store the result
*/
memset(alpha, 0, sizeof(double) * T * nb_states);
/* State 0 */
for (int h = 0; h < nb_states; h++) {
/* Equiprobable initial probability */
alpha[0 * nb_states + h] = 0.5 * X_pdf[h * T + 0];
}
/* Other states */
double sum;
for (int t = 1; t < T; t++) {
sum = 0;
for (int h = 0; h < nb_states; h++) {
for (int h_1 = 0; h_1 < nb_states; h_1++) {
alpha[t * nb_states + h] += A[h_1 * nb_states + h] *
alpha[(t - 1) * nb_states + h_1];
}
alpha[t * nb_states + h] *= X_pdf[h * T + t];
sum += alpha[t * nb_states + h];
}
for (int h = 0; h < nb_states; h++) {
alpha[t * nb_states + h] /= sum;
}
}
}
#ifndef HMCIN_FORWARD_H_
# define HMCIN_FORWARD_H_
void _hmcin_forward(double*, int, int, double*, double*);
#endif /* !HMCIN_FORWARD_H_ */
#This makefile is here to deal with the shared librairy in C
#Algorithms made for efficiency
RM = rm -f
DEST_DIR = .
SRC_DIR = .
SRC = $(SRC_DIR)/hmcin_forward.c \
$(SRC_DIR)/hmcin_backward.c \
OBJ_HMCIN = hmcin_forward.o \
hmcin_backward.o
OUTPUT_HMCIN = $(DEST_DIR)/hmcin_functions.so
all: lib clean
lib:
gcc -fpic -c -std=c99 $(SRC)
gcc -lm -shared -o $(OUTPUT_HMCIN) $(OBJ_HMCIN)
clean:
$(RM) $(OBJ_DHMCCN) $(OBJ_DHMCIN2) $(OBJ_DPMCCN1) $(OBJ_DPMCCN2)
fclean: clean
$(RM) $(OUTPUT_DHMCCN) $(OUTPUT_DHMCIN2) $(OUTPUT_DPMCCN1) $(OUTPUT_DPMCCN2)
re: fclean all
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment