Skip to content

Instantly share code, notes, and snippets.

@AngelicosPhosphoros
Last active March 12, 2019 20:45
Show Gist options
  • Save AngelicosPhosphoros/1de5fbe53e2048a2be9a1f4689dcfcdb to your computer and use it in GitHub Desktop.
Save AngelicosPhosphoros/1de5fbe53e2048a2be9a1f4689dcfcdb to your computer and use it in GitHub Desktop.
Homework about Hidden Markov Models
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Homework Program HMM Timur Khuzin"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Function that read parameters."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"All lines started with `#` are ignored\n",
"\n",
"First line of file: M,K,L tab separated<br>\n",
"Than M lines of M numbers (MxM transition matrix)<br>\n",
"Row index is old state, column index is next state<br>\n",
"Than M lines of K numbers (MxK emisstion matrix)<br>\n",
"Each row of emission matrix is probabilities of K emissions.<br>\n",
"Than M numbers in line (Vector of M — initial distribution)<br>"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"\n",
"class HMM_Params:\n",
" def __init__(self, state_num, emission_num, to_observe):\n",
" self.state_num = state_num\n",
" self.emission_num = emission_num\n",
" self.observation_length = to_observe\n",
" self.transition_matrix = np.zeros((state_num, state_num), dtype='float64')\n",
" self.emission_matrix = np.zeros((state_num, emission_num), dtype='float64')\n",
" self.beginning_distribution = np.zeros((state_num), dtype='float64')\n",
"\n",
"def read_params(filename):\n",
" def read_non_comment(f):\n",
" s = f.readline()\n",
" while(s[0]=='#'):\n",
" s = f.readline()\n",
" return s\n",
" with open(filename, 'r') as f:\n",
" M, K, L = map(int, read_non_comment(f).strip().split())\n",
" params = HMM_Params(M, K, L)\n",
" \n",
" for i in range(M):\n",
" params.transition_matrix[i,:] = [float(x) for x in read_non_comment(f).strip().split()]\n",
" params.transition_matrix[i,:] /= np.sum(params.transition_matrix[i,:])\n",
" \n",
" for i in range(M):\n",
" params.emission_matrix[i,:] = [float(x) for x in read_non_comment(f).strip().split()]\n",
" params.emission_matrix[i,:]/= np.sum(params.emission_matrix[i,:])\n",
" \n",
" params.beginning_distribution[:] = np.array([float(x) for x in read_non_comment(f).strip().split()])\n",
" params.beginning_distribution[:] /= np.sum(params.beginning_distribution[:])\n",
" \n",
" return params"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### HMM generator"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from numpy.random import choice as npchoice\n",
"\n",
"# params is HMM_Params\n",
"# returns sequence of tuples: (pi, x)\n",
"def generate_random_HMM(params):\n",
" current_pos = npchoice(params.state_num,1, p=params.beginning_distribution)[0]\n",
" current_emission = npchoice(params.emission_num, 1, p=params.emission_matrix[current_pos,:])[0]\n",
" result = []\n",
" while len(result)<params.observation_length:\n",
" result.append((current_pos, current_emission))\n",
" current_pos = npchoice(params.state_num, 1, p=params.transition_matrix[current_pos,:])[0]\n",
" current_emission = npchoice(params.emission_num, 1, p=params.emission_matrix[current_pos,:])[0]\n",
" return result\n",
"\n",
"def beautify_HMM_sequence(hmm_seq):\n",
" def beautify_pair(p):\n",
" return 'p%d'%p[0], 'e%d'%p[1]\n",
" return [beautify_pair(x) for x in hmm_seq]\n",
"\n",
"def print_HMM_to_file(hmm, filename):\n",
" hmm = beautify_HMM_sequence(hmm)\n",
" with open(filename, 'w') as f:\n",
" f.write('\\n'.join(map(lambda p:'%s\\t%s'%p, hmm)))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"params = read_params('test.txt')\n",
"hmm = generate_random_HMM(params)\n",
"print_HMM_to_file(hmm, 'generated_path.txt')\n",
"#beautify_HMM_sequence(hmm)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# input is HMM_Params and observation list (with ints inside)\n",
"# output is hidden states in integer list\n",
"def viterbi_solver(params, observations):\n",
" # matrix LxM where stored previous states for each state in row\n",
" parents = np.zeros((params.observation_length, params.state_num), dtype='float64')\n",
" # current probabilities\n",
" current_iter = np.log(params.beginning_distribution) + np.log(params.emission_matrix[:,observations[0]])\n",
" # probabilities of next step\n",
" next_generation = np.zeros(params.state_num,dtype='float64')\n",
" for i in range(1, params.observation_length):\n",
" for state in range(params.state_num):\n",
" probs = current_iter + np.log(params.transition_matrix[:,state])\n",
" prev = probs.argmax()\n",
" next_generation[state] = probs[prev] + np.log(params.emission_matrix[state, observations[i]])\n",
" parents[i,state] = prev\n",
" # avoid garbage collection\n",
" current_iter, next_generation = next_generation, current_iter\n",
" \n",
" path = np.ones(params.observation_length, dtype='int64')\n",
" path[-1] = current_iter.argmax()\n",
" for i in range(params.observation_length - 2, -1, -1):\n",
" p = parents[i+1, path[i+1]]\n",
" path[i] = p\n",
" return path"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Accuracy: 0.755000\n"
]
}
],
"source": [
"# checking viterbi\n",
"params = read_params('test.txt')\n",
"data = [x.strip() for x in open('generated_path.txt', 'r')]\n",
"observations = [int(x.split()[1][1:]) for x in data]\n",
"actual_states = [int(x.split()[0][1:]) for x in data]\n",
"most_probable = viterbi_solver(params, observations)\n",
"with open('viterbi.txt', 'w') as f:\n",
" f.write(\"RPath\\tObserv\\tViterbi\\n\")\n",
" p = map(lambda x: '%s\\tp%d'%x, zip(data, most_probable))\n",
" p = list(p)\n",
" f.write('\\n'.join(p))\n",
"same_indices = [i for i in range(len(data)) if actual_states[i]==most_probable[i]]\n",
"print(\"Accuracy: %f\"%(len(same_indices)/len(data)))"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from scipy.special import logsumexp\n",
"\n",
"# returns the matrix for posterior\n",
"def forward_algorithm(params, observations):\n",
" observe_len = len(observations)\n",
" prob_matrix = np.zeros((observe_len+1, params.state_num), dtype='float64')\n",
" # initial state\n",
" # {1, 0,...,0} leads to NANs, but \n",
" prob_matrix[-1,:] = np.log(1/params.state_num) \n",
" for i in range(observe_len):\n",
" for nxt in range(params.state_num):\n",
" probs = np.log(params.transition_matrix[:,nxt]) + prob_matrix[i-1,:]\n",
" prob_matrix[i, nxt] = np.log(params.emission_matrix[nxt, observations[i]]) + logsumexp(probs)\n",
" return prob_matrix"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"matr = forward_algorithm(params, observations)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"# returns the matrix for posterior\n",
"def backward_algorithm(params, observations):\n",
" observe_len = len(observations)\n",
" matrix = np.zeros((observe_len, params.state_num), dtype='float64')\n",
" matrix[-1,:] = 1\n",
" for i in range(observe_len-2, -1, -1):\n",
" for prev in range(params.state_num):\n",
" probs = np.log(params.transition_matrix[prev,:])+\\\n",
" np.log(params.emission_matrix[:,observations[i+1]])+\\\n",
" matrix[i+1,:]\n",
" matrix[i, prev] = logsumexp(probs)\n",
" return matrix"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([[-135.82776627, -135.30817911],\n",
" [-135.33918632, -134.81965626],\n",
" [-134.85045917, -134.33115293],\n",
" [-134.36115517, -133.8427262 ],\n",
" [-133.86959477, -133.35459986],\n",
" [-133.36926906, -132.86765287],\n",
" [-132.83578423, -132.3853528 ],\n",
" [-132.18823033, -131.92163045],\n",
" [-131.24450407, -131.53678022],\n",
" [-130.71381007, -130.22118283]])"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"backward_algorithm(params, observations)[:10]"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [],
"source": [
"# returns probabilities for each state in each iteration\n",
"# iteration is row, state is column\n",
"def posterior_decoding(params, observations):\n",
" forward_matr = forward_algorithm(params, observations)\n",
" backward_matr = backward_algorithm(params, observations)\n",
" probabilites = forward_matr[:backward_matr.shape[0],:] + backward_matr\n",
" total_probability = logsumexp(probabilites, axis=1)\n",
" out_probs = np.zeros((len(observations), params.state_num), dtype='float64')\n",
" for i in range(len(observations)):\n",
" out_probs[i, :] = probabilites[i,:] - total_probability[i]\n",
" out_probs = np.exp(out_probs)\n",
" return out_probs"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,\n",
" 1., 1., 1.])"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"posterior_decoding(params, np.array(observations))\n",
"np.sum(posterior_decoding(params, np.array(observations)), axis=1)[:20]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Unfair casino"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [],
"source": [
"from matplotlib import pyplot as plt\n",
"\n",
"casino_params = read_params('casino_in.txt')\n",
"casino_hmm = generate_random_HMM(casino_params)\n",
"print_HMM_to_file(casino_hmm, 'generated_casino_path.txt')\n",
"hidden_states = np.array([x[0] for x in casino_hmm], dtype=int)\n",
"casino_observes = np.array([x[1] for x in casino_hmm], dtype=int)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### viterbi"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Accuracy: 0.790000\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"D:\\Anaconda3\\lib\\site-packages\\ipykernel_launcher.py:7: RuntimeWarning: divide by zero encountered in log\n",
" import sys\n"
]
},
{
"data": {
"text/plain": [
"[<matplotlib.lines.Line2D at 0x178c2356320>]"
]
},
"execution_count": 31,
"metadata": {},
"output_type": "execute_result"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"\n",
"most_probable = viterbi_solver(casino_params, casino_observes)\n",
"with open('casino_viterbi.txt', 'w') as f:\n",
" f.write(\"Hidden\\tObserved\\tViterbi\\n\")\n",
" p = map(lambda x: 'p%d\\te%d\\tp%d'%x, map(lambda x:(x[0][0], x[0][1], x[1]),zip(casino_hmm, most_probable)))\n",
" p = list(p)\n",
" f.write('\\n'.join(p))\n",
"same_indices = [i for i in range(len(casino_hmm)) if hidden_states[i]==most_probable[i]]\n",
"print(\"Accuracy: %f\"%(len(same_indices)/len(casino_hmm)))\n",
"# plot difference between actual and estimate\n",
"# if upper zero, real is loaded dice, estimate is fair\n",
"# if lower zero, real is fair, estimate is loaded\n",
"# if same, there is correct estimate\n",
"plt.plot(np.arange(len(casino_hmm)), hidden_states-most_probable)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### posterior"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Plot of hidden values is reflected on Ox to better visualization.<br>"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Accuracy: 0.780000\n"
]
},
{
"data": {
"text/plain": [
"[<matplotlib.lines.Line2D at 0x178c22dc080>]"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"probabilities = posterior_decoding(casino_params, np.array(casino_observes))\n",
"# calculation of accuracy\n",
"estimate = [probabilities[i].argmax() for i in range(len(casino_observes))]\n",
"same_indices = [i for i in range(len(casino_hmm)) if hidden_states[i]==estimate[i]]\n",
"print(\"Accuracy: %f\"%(len(same_indices)/len(casino_hmm)))\n",
"\n",
"# calculation of plot\n",
"plt.plot(np.arange(len(casino_hmm)), -hidden_states+1)\n",
"plt.plot(np.arange(len(casino_hmm)), probabilities[:,0])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Checking Accuracy of different versions "
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Current stage: 0 of 1000\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"D:\\Anaconda3\\lib\\site-packages\\ipykernel_launcher.py:7: RuntimeWarning: divide by zero encountered in log\n",
" import sys\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Current stage: 100 of 1000\n",
"Current stage: 200 of 1000\n",
"Current stage: 300 of 1000\n",
"Current stage: 400 of 1000\n",
"Current stage: 500 of 1000\n",
"Current stage: 600 of 1000\n",
"Current stage: 700 of 1000\n",
"Current stage: 800 of 1000\n",
"Current stage: 900 of 1000\n",
"Viterbi:\tAccuracy median 0.800000\tAccuracy variance 0.004030\n",
"Posterior:\tAccuracy median 0.820000\tAccuracy variance 0.002717\n"
]
}
],
"source": [
"N = 1000\n",
"accuracy_viterbi = np.zeros(N, dtype=float)\n",
"accuracy_posterior = np.zeros(N, dtype=float)\n",
"for k in range(N):\n",
" if k%100==0:\n",
" print(\"Current stage: %d of %d\"%(k,N))\n",
" hmm_test = generate_random_HMM(casino_params)\n",
" hidden_states = np.array([x[0] for x in hmm_test], dtype=int)\n",
" observes = np.array([x[1] for x in hmm_test], dtype=int)\n",
" # viterbi\n",
" estimate = viterbi_solver(casino_params, observes)\n",
" same_indices = [i for i in range(len(hmm_test)) if hidden_states[i]==estimate[i]]\n",
" accuracy_viterbi[k] = len(same_indices)/len(hmm_test)\n",
" # posterior\n",
" probabilities = posterior_decoding(casino_params, observes)\n",
" estimate = [probabilities[i].argmax() for i in range(len(observes))]\n",
" same_indices = [i for i in range(len(hmm_test)) if hidden_states[i]==estimate[i]]\n",
" accuracy_posterior[k] = len(same_indices)/len(hmm_test)\n",
" \n",
"print(\"Viterbi:\\tAccuracy median %f\\tAccuracy variance %f\"%(np.median(accuracy_viterbi), np.var(accuracy_viterbi)))\n",
"print(\"Posterior:\\tAccuracy median %f\\tAccuracy variance %f\"%(np.median(accuracy_posterior), np.var(accuracy_posterior)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment