Skip to content

Instantly share code, notes, and snippets.

@devashishd12
Created April 12, 2017 17:43
Show Gist options
  • Save devashishd12/cc8439b46f09964501a476d17f622ce2 to your computer and use it in GitHub Desktop.
Save devashishd12/cc8439b46f09964501a476d17f622ce2 to your computer and use it in GitHub Desktop.
Implementation of Linear Chain Conditional Random Fields with application to POS tagging
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# Implementation of first order conditional random fields\n",
"With application to POS tagging"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import re\n",
"\n",
"import numpy as np\n",
"\n",
"from collections import defaultdict\n",
"from scipy import optimize"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Define class\n",
"__Assumptions__: In the start layer, all other states have alpha=0 other than start state which has 1. Accordingly for beta for end layer and end state."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"class CRF(object):\n",
" \"\"\"\n",
" Implementation of first order conditional random fields.\n",
" Using the same notations as in: http://www.eng.utah.edu/~cs6961/papers/klinger-crf-intro.pdf\n",
" \n",
" Parameters:\n",
" ----------\n",
" f: Binary valued indicator function templates for first order CRF.\n",
" labels: List of unique labels.\n",
" sigma: Regularisation constant for expectation step.\n",
" maxfun: Maximum number of function evaluations.\n",
" maxiter: Maximum number of iterations.\n",
" \"\"\"\n",
" def __init__(self, f, labels, sigma=10, maxfun=15000, maxiter=15000):\n",
" self.f = f\n",
" self.labels = [START] + labels + [END]\n",
" self.m = len(f)\n",
" self.num_states = len(self.labels)\n",
" self.sigma = sigma\n",
" self.label_id = {l:i for i,l in enumerate(self.labels)}\n",
" self.start = self.label_id[START]\n",
" self.end = self.label_id[END]\n",
" self.maxfun = maxfun\n",
" self.maxiter = maxiter\n",
" \n",
" def _vectorize_x(self, x):\n",
" \"\"\"\n",
" Internal helper function to vectorize data points.\n",
" \n",
" Parameters:\n",
" ----------\n",
" x: List of words\n",
" \n",
" Returns:\n",
" -------\n",
" matrix: numpy array of shape (num_words + 1, num_states, num_states, num_functions). +1 is there\n",
" to account for start or 0.\n",
" \"\"\"\n",
" matrix = np.zeros((len(x) + 1, self.num_states, self.num_states, self.m)) # To account for start state and end state in alpha and beta respectively\n",
" for i in range(len(x) + 1):\n",
" for j, yp in enumerate(self.labels):\n",
" for k, y in enumerate(self.labels):\n",
" for l, f in enumerate(self.f):\n",
" matrix[i, j, k, l] = f(yp, y, x, i)\n",
" return matrix\n",
" \n",
" def _vectorize_inputs(self, X, Y):\n",
" \"\"\"\n",
" Internal helper function to transform raw input into vectors.\n",
" \n",
" Parameters:\n",
" ----------\n",
" X: Raw X input.\n",
" Y: Raw Y input.\n",
" \n",
" Returns:\n",
" -------\n",
" x_vectorized: array of 4D matrices each with shape (num_words + 1, num_states, num_states, num_functions)\n",
" y_vectorized: array of array of int mappings for labels\n",
" \"\"\"\n",
" x_vectorized = [self._vectorize_x(x) for x in X]\n",
" y_vectorized = []\n",
" for i in range(len(Y)):\n",
" y_map = [self.label_id[l] for l in Y[i]]\n",
" y_map.insert(0, self.label_id[START])\n",
" y_map.append(self.label_id[END])\n",
" y_vectorized.append(y_map)\n",
" return x_vectorized, y_vectorized\n",
" \n",
" def _compute_alphas(self, psi_matrix):\n",
" \"\"\"\n",
" Internal helper function to compute alpha (forward) scores.\n",
" \n",
" Parameters:\n",
" ----------\n",
" psi_matrix: psi matrix. array-like with shape (num_words + 1, num_states, num_states)\n",
" \n",
" Returns:\n",
" -------\n",
" alphas: alpha matrix. array-like with shape (num_states, num_words + 1) to account for start layer.\n",
" alpha_final: Final alpha for end layer and end state.\n",
" \"\"\"\n",
" alphas = np.zeros(shape=(self.num_states, psi_matrix.shape[0])) # To account for start layer\n",
" alphas[self.start, 0] = 1 # initialize start layer\n",
" for j in range(psi_matrix.shape[0] - 1):\n",
" alphas[:, j + 1] = psi_matrix[j].T.dot(alphas[:, j]) # Transpose for aligning y_ps together\n",
" alpha_final = psi_matrix[-1][self.end].T.dot(alphas[:, -1])\n",
" return alphas, alpha_final\n",
" \n",
" def _compute_betas(self, psi_matrix):\n",
" \"\"\"\n",
" Internal helper function to compute beta (backward) scores.\n",
" \n",
" Parameters:\n",
" ----------\n",
" psi_matrix: psi matrix. array-like with shape (word_id, num_states, num_states)\n",
" \n",
" Returns:\n",
" -------\n",
" betas: beta matrix. array-like with shape (num_states, num_words + 1) to account for end layer.\n",
" beta_final: Z value\n",
" \"\"\"\n",
" betas = np.zeros(shape=(self.num_states, psi_matrix.shape[0]))\n",
" betas[self.end, -1] = 1 # initialize end state\n",
" for j in range(psi_matrix.shape[0] - 2, -1, -1):\n",
" betas[:, j] = psi_matrix[j + 1].dot(betas[:, j + 1])\n",
" # compute final beta\n",
" beta_final = psi_matrix[0][:][self.start].dot(betas[:, 0]) # To align y's together\n",
" return betas, beta_final\n",
" \n",
" def _compute_loss_deriv(self, X_vecs, Y_vecs, weights):\n",
" \"\"\"\n",
" Internal helper function to compute loss and derivative.\n",
" \n",
" Parameters:\n",
" ----------\n",
" X_vecs: vectorised X\n",
" Y_vecs: vectorised Y\n",
" \n",
" Returns:\n",
" -------\n",
" log_likelihood_loss: Calculated log likelihood loss.\n",
" derivative: First derivative values for all feature templates.\n",
" \"\"\"\n",
" log_likelihood_loss = 0\n",
" derivative = np.zeros(self.m)\n",
" for x_vec, y_vec in zip(X_vecs, Y_vecs):\n",
" # x_vec shape: [num_words + 1, num_states, num_states, m]\n",
" num_words = x_vec.shape[0] # num_words actual + 1\n",
" psi_matrix = np.exp(x_vec.dot(weights)) # [num_words + 1, num_states, num_states]\n",
" alphas, _ = self._compute_alphas(psi_matrix) # [num_states, num_words + 1]\n",
" betas, z = self._compute_betas(psi_matrix) # [num_states, num_words + 1]\n",
" y_previous = y_vec[:-1] # num_words + 1\n",
" y_next = y_vec[1:] # num_words + 1\n",
" # Compute log loss\n",
" log_likelihood_loss += np.sum(psi_matrix[range(num_words), y_previous, y_next]) - np.log(z)\n",
" # Compute empirical probs\n",
" empirical_probs = np.sum(x_vec[range(num_words), y_previous, y_next], axis=0)\n",
" # Compute model probs\n",
" model_probs = np.zeros(self.m)\n",
" # Loop version below for verification\n",
"# for fi in range(self.m):\n",
"# for j in range(x_vec.shape[0] - 1):\n",
"# for yp in range(self.num_states):\n",
"# for y in range(self.num_states):\n",
"# prod = x_vec[j, yp, y, fi] * alphas[yp, j]\n",
"# prod *= psi_matrix[j, yp, y] * betas[y, j + 1]\n",
"# model_probs[fi] += prod\n",
" func_word_matrix = np.moveaxis(x_vec, 3, 0)\n",
" # 1. multiply with alphas\n",
" states, words = alphas.shape\n",
" alphas = alphas.T.reshape(1, words, states, 1)\n",
" func_word_matrix = func_word_matrix * alphas\n",
" # 2. multiply with psi matrix\n",
" func_word_matrix = func_word_matrix * psi_matrix\n",
" # 3. dot product with betas\n",
" word_func_matrix = np.moveaxis(func_word_matrix, 0, 1)\n",
" # TODO: replace for loop below with np.einsum\n",
" for j in range(num_words):\n",
" res = word_func_matrix[j].dot(betas[:, j])\n",
" model_probs += np.sum(res, axis=1)\n",
" model_probs /= z\n",
" derivative += empirical_probs - model_probs\n",
" loss_regularization = 2 * self.sigma ** 2\n",
" log_likelihood_loss -= np.sum(weights ** 2) / loss_regularization\n",
" derivative -= np.sum(weights) / (self.sigma ** 2)\n",
" return -log_likelihood_loss, -derivative # negative is working better.\n",
" \n",
" def fit(self, X, Y):\n",
" \"\"\"\n",
" Parameters:\n",
" ----------\n",
" X: Word data. List of list of word sequences.\n",
" Y: Labels data. List of list of label sequences.\n",
" \n",
" Returns:\n",
" -------\n",
" self: object. Instance of self.\n",
" \"\"\"\n",
" self.lamb = np.random.randn(self.m)\n",
" X_vecs, Y_vecs = self._vectorize_inputs(X, Y)\n",
" opt_func = lambda weights: self._compute_loss_deriv(X_vecs, Y_vecs, weights)\n",
" self.lamb, _, _ = optimize.fmin_l_bfgs_b(opt_func, self.lamb,\n",
" maxiter=self.maxiter, maxfun=self.maxfun)\n",
" return self\n",
" \n",
" def predict(self, x):\n",
" \"\"\"\n",
" Prediction using viterbi algorithm.\n",
" \n",
" Parameters:\n",
" ----------\n",
" x: Word sequence.\n",
" \n",
" Returns:\n",
" -------\n",
" path: Predicted label sequence.\n",
" \"\"\"\n",
" num_words = len(x)\n",
" x_vec = self._vectorize_x(x)\n",
" psi_matrix = np.exp(x_vec.dot(self.lamb))\n",
" backtrack_matrix = np.empty((self.num_states, num_words), dtype=np.int32)\n",
" # We don't need a del matrix since we don't need backtracking potentials now. Vector will work for us.\n",
" del_vector = psi_matrix[0, self.start] # first step\n",
" for j in range(1, num_words):\n",
" temp = np.empty(self.num_states)\n",
" for s in range(self.num_states):\n",
" res = del_vector * psi_matrix[j, :, s] # all y_p's for certain y\n",
" backtrack_matrix[s, j] = np.argmax(res)\n",
" temp[s] = np.max(res)\n",
" del_vector = temp\n",
" y = del_vector.argmax()\n",
" # backtrack\n",
" path = []\n",
" for j in xrange(num_words - 1, -1, -1):\n",
" path.append(y)\n",
" y = backtrack_matrix[y, j]\n",
" path.reverse()\n",
" path = [self.labels[l] for l in path]\n",
" return path"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Load file"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"word_data = []\n",
"label_data = []\n",
"all_labels = set()\n",
"word_sets = defaultdict(set)\n",
"obsrvs = set()\n",
"for line in open('sample.txt'):\n",
" words,labels = [],[]\n",
" for token in line.strip().split():\n",
" word,label= token.split('/')\n",
" all_labels.add(label)\n",
" word_sets[label].add(word.lower())\n",
" obsrvs.add(word.lower())\n",
" words.append(word)\n",
" labels.append(label)\n",
"\n",
" word_data.append(words)\n",
" label_data.append(labels)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Make feature templates\n",
"Credits: https://github.com/shawntan/python-crf"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"START = '//'\n",
"END = '\\\\'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"labels = list(all_labels)\n",
"lbls = [START] + labels + [END]\n",
"transition_functions = [\n",
" lambda yp,y,x_v,i,_yp=_yp,_y=_y: 1 if yp==_yp and y==_y else 0\n",
" for _yp in lbls[:-1] for _y in lbls[1:]\n",
" ]\n",
"def set_membership(tag):\n",
" def fun(yp,y,x_v,i):\n",
" if i < len(x_v) and x_v[i].lower() in word_sets[tag]:\n",
" return 1\n",
" else:\n",
" return 0\n",
" return fun\n",
"observation_functions = [set_membership(t) for t in word_sets ]\n",
"misc_functions = [\n",
" lambda yp,y,x_v,i: 1 if i < len(x_v) and re.match('^[^0-9a-zA-Z]+$',x_v[i]) else 0,\n",
" lambda yp,y,x_v,i: 1 if i < len(x_v) and re.match('^[A-Z\\.]+$',x_v[i]) else 0,\n",
" lambda yp,y,x_v,i: 1 if i < len(x_v) and re.match('^[0-9\\.]+$',x_v[i]) else 0 \n",
" ]\n",
"tagval_functions = [\n",
" lambda yp,y,x_v,i,_y=_y,_x=_x: 1 if i < len(x_v) and y==_y and x_v[i].lower() ==_x else 0\n",
" for _y in labels\n",
" for _x in obsrvs]\n",
"class FeatureSet(object):\n",
" @classmethod\n",
" def functions(cls,lbls,*arguments):\n",
" def gen():\n",
" for lbl in lbls:\n",
" for arg in arguments:\n",
" if isinstance(arg,tuple):\n",
" yield cls(lbl,*arg)\n",
" else:\n",
" yield cls(lbl,arg)\n",
" return list(gen())\n",
" def __repr__(self):\n",
" return \"%s(%s)\"%(self.__class__.__name__,self.__dict__)\n",
"\n",
"class Transitions(FeatureSet):\n",
" def __init__(self,curr_lbl,prev_lbl):\n",
" self.prev_label = prev_lbl\n",
" self.label = curr_lbl\n",
"\n",
" def __call__(self,yp,y,x_v,i):\n",
" if yp==self.prev_label and y==self.label:\n",
" return 1\n",
" else:\n",
" return 0"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"feature_functions = transition_functions + observation_functions + misc_functions + tagval_functions + Transitions.functions(lbls[1:], lbls[:-1])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"crf = CRF(feature_functions, labels)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": false
},
"outputs": [],
"source": [
"crf = crf.fit(word_data, label_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"scrolled": true
},
"outputs": [],
"source": [
"preds = crf.predict(word_data[2])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We will test extent of fit because of lack of testing data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"zip(preds, label_data[2]) # comparison of classifications vs actual"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment