Created April 12, 2017
Implementation of Linear Chain Conditional Random Fields with application to POS tagging
"# Implementation of first order conditional random fields\n",
"With application to POS tagging"
"import re\n",
"import numpy as np\n",
"from collections import defaultdict\n",
"from scipy import optimize"
"### Define class\n",
"__Assumptions__: In the start layer, all other states have alpha=0 other than start state which has 1. Accordingly for beta for end layer and end state."
"class CRF(object):\n",
" \"\"\"\n",
" Implementation of first order conditional random fields.\n",
" Using the same notations as in:\n",
" \n",
" Parameters:\n",
" ----------\n",
" f: Binary valued indicator function templates for first order CRF.\n",
" labels: List of unique labels.\n",
" sigma: Regularisation constant for expectation step.\n",
" maxfun: Maximum number of function evaluations.\n",
" maxiter: Maximum number of iterations.\n",
" \"\"\"\n",
" def __init__(self, f, labels, sigma=10, maxfun=15000, maxiter=15000):\n",
" self.f = f\n",
" self.labels = [START] + labels + [END]\n",
" self.m = len(f)\n",
" self.num_states = len(self.labels)\n",
" self.sigma = sigma\n",
" self.label_id = {l:i for i,l in enumerate(self.labels)}\n",
" self.start = self.label_id[START]\n",
" self.end = self.label_id[END]\n",
" self.maxfun = maxfun\n",
" self.maxiter = maxiter\n",
" \n",
" def _vectorize_x(self, x):\n",
" \"\"\"\n",
" Internal helper function to vectorize data points.\n",
" \n",
" Parameters:\n",
" ----------\n",
" x: List of words\n",
" \n",
" Returns:\n",
" -------\n",
" matrix: numpy array of shape (num_words + 1, num_states, num_states, num_functions). +1 is there\n",
" to account for start or 0.\n",
" \"\"\"\n",
" matrix = np.zeros((len(x) + 1, self.num_states, self.num_states, self.m)) # To account for start state and end state in alpha and beta respectively\n",
" for i in range(len(x) + 1):\n",
" for j, yp in enumerate(self.labels):\n",
" for k, y in enumerate(self.labels):\n",
" for l, f in enumerate(self.f):\n",
" matrix[i, j, k, l] = f(yp, y, x, i)\n",
" return matrix\n",
" \n",
" def _vectorize_inputs(self, X, Y):\n",
" \"\"\"\n",
" Internal helper function to transform raw input into vectors.\n",
" \n",
" Parameters:\n",
" ----------\n",
" X: Raw X input.\n",
" Y: Raw Y input.\n",
" \n",
" Returns:\n",
" -------\n",
" x_vectorized: array of 4D matrices each with shape (num_words + 1, num_states, num_states, num_functions)\n",
" y_vectorized: array of array of int mappings for labels\n",
" \"\"\"\n",
" x_vectorized = [self._vectorize_x(x) for x in X]\n",
" y_vectorized = []\n",
" for i in range(len(Y)):\n",
" y_map = [self.label_id[l] for l in Y[i]]\n",
" y_map.insert(0, self.label_id[START])\n",
" y_map.append(self.label_id[END])\n",
" y_vectorized.append(y_map)\n",
" return x_vectorized, y_vectorized\n",
" \n",
" def _compute_alphas(self, psi_matrix):\n",
" \"\"\"\n",
" Internal helper function to compute alpha (forward) scores.\n",
" \n",
" Parameters:\n",
" ----------\n",
" psi_matrix: psi matrix. array-like with shape (num_words + 1, num_states, num_states)\n",
" \n",
" Returns:\n",
" -------\n",
" alphas: alpha matrix. array-like with shape (num_states, num_words + 1) to account for start layer.\n",
" alpha_final: Final alpha for end layer and end state.\n",
" \"\"\"\n",
" alphas = np.zeros(shape=(self.num_states, psi_matrix.shape[0])) # To account for start layer\n",
" alphas[self.start, 0] = 1 # initialize start layer\n",
" for j in range(psi_matrix.shape[0] - 1):\n",
" alphas[:, j + 1] = psi_matrix[j][:, j]) # Transpose for aligning y_ps together\n",
" alpha_final = psi_matrix[-1][self.end][:, -1])\n",
" return alphas, alpha_final\n",
" \n",
" def _compute_betas(self, psi_matrix):\n",
" \"\"\"\n",
" Internal helper function to compute beta (backward) scores.\n",
" \n",
" Parameters:\n",
" ----------\n",
" psi_matrix: psi matrix. array-like with shape (word_id, num_states, num_states)\n",
" \n",
" Returns:\n",
" -------\n",
" betas: beta matrix. array-like with shape (num_states, num_words + 1) to account for end layer.\n",
" beta_final: Z value\n",
" \"\"\"\n",
" betas = np.zeros(shape=(self.num_states, psi_matrix.shape[0]))\n",
" betas[self.end, -1] = 1 # initialize end state\n",
" for j in range(psi_matrix.shape[0] - 2, -1, -1):\n",
" betas[:, j] = psi_matrix[j + 1].dot(betas[:, j + 1])\n",
" # compute final beta\n",
" beta_final = psi_matrix[0][:][self.start].dot(betas[:, 0]) # To align y's together\n",
" return betas, beta_final\n",
" \n",
" def _compute_loss_deriv(self, X_vecs, Y_vecs, weights):\n",
" \"\"\"\n",
" Internal helper function to compute loss and derivative.\n",
" \n",
" Parameters:\n",
" ----------\n",
" X_vecs: vectorised X\n",
" Y_vecs: vectorised Y\n",
" \n",
" Returns:\n",
" -------\n",
" log_likelihood_loss: Calculated log likelihood loss.\n",
" derivative: First derivative values for all feature templates.\n",
" \"\"\"\n",
" log_likelihood_loss = 0\n",
" derivative = np.zeros(self.m)\n",
" for x_vec, y_vec in zip(X_vecs, Y_vecs):\n",
" # x_vec shape: [num_words + 1, num_states, num_states, m]\n",
" num_words = x_vec.shape[0] # num_words actual + 1\n",
" psi_matrix = np.exp( # [num_words + 1, num_states, num_states]\n",
" alphas, _ = self._compute_alphas(psi_matrix) # [num_states, num_words + 1]\n",
" betas, z = self._compute_betas(psi_matrix) # [num_states, num_words + 1]\n",
" y_previous = y_vec[:-1] # num_words + 1\n",
" y_next = y_vec[1:] # num_words + 1\n",
" # Compute log loss\n",
" log_likelihood_loss += np.sum(psi_matrix[range(num_words), y_previous, y_next]) - np.log(z)\n",
" # Compute empirical probs\n",
" empirical_probs = np.sum(x_vec[range(num_words), y_previous, y_next], axis=0)\n",
" # Compute model probs\n",
" model_probs = np.zeros(self.m)\n",
" # Loop version below for verification\n",
"# for fi in range(self.m):\n",
"# for j in range(x_vec.shape[0] - 1):\n",
"# for yp in range(self.num_states):\n",
"# for y in range(self.num_states):\n",
"# prod = x_vec[j, yp, y, fi] * alphas[yp, j]\n",
"# prod *= psi_matrix[j, yp, y] * betas[y, j + 1]\n",
"# model_probs[fi] += prod\n",
" func_word_matrix = np.moveaxis(x_vec, 3, 0)\n",
" # 1. multiply with alphas\n",
" states, words = alphas.shape\n",
" alphas = alphas.T.reshape(1, words, states, 1)\n",
" func_word_matrix = func_word_matrix * alphas\n",
" # 2. multiply with psi matrix\n",
" func_word_matrix = func_word_matrix * psi_matrix\n",
" # 3. dot product with betas\n",
" word_func_matrix = np.moveaxis(func_word_matrix, 0, 1)\n",
" # TODO: replace for loop below with np.einsum\n",
" for j in range(num_words):\n",
" res = word_func_matrix[j].dot(betas[:, j])\n",
" model_probs += np.sum(res, axis=1)\n",
" model_probs /= z\n",
" derivative += empirical_probs - model_probs\n",
" loss_regularization = 2 * self.sigma ** 2\n",
" log_likelihood_loss -= np.sum(weights ** 2) / loss_regularization\n",
" derivative -= np.sum(weights) / (self.sigma ** 2)\n",
" return -log_likelihood_loss, -derivative # negative is working better.\n",
" \n",
" def fit(self, X, Y):\n",
" \"\"\"\n",
" Parameters:\n",
" ----------\n",
" X: Word data. List of list of word sequences.\n",
" Y: Labels data. List of list of label sequences.\n",
" \n",
" Returns:\n",
" -------\n",
" self: object. Instance of self.\n",
" \"\"\"\n",
" self.lamb = np.random.randn(self.m)\n",
" X_vecs, Y_vecs = self._vectorize_inputs(X, Y)\n",
" opt_func = lambda weights: self._compute_loss_deriv(X_vecs, Y_vecs, weights)\n",
" self.lamb, _, _ = optimize.fmin_l_bfgs_b(opt_func, self.lamb,\n",
" maxiter=self.maxiter, maxfun=self.maxfun)\n",
" return self\n",
" \n",
" def predict(self, x):\n",
" \"\"\"\n",
" Prediction using viterbi algorithm.\n",
" \n",
" Parameters:\n",
" ----------\n",
" x: Word sequence.\n",
" \n",
" Returns:\n",
" -------\n",
" path: Predicted label sequence.\n",
" \"\"\"\n",
" num_words = len(x)\n",
" x_vec = self._vectorize_x(x)\n",
" psi_matrix = np.exp(\n",
" backtrack_matrix = np.empty((self.num_states, num_words), dtype=np.int32)\n",
" # We don't need a del matrix since we don't need backtracking potentials now. Vector will work for us.\n",
" del_vector = psi_matrix[0, self.start] # first step\n",
" for j in range(1, num_words):\n",
" temp = np.empty(self.num_states)\n",
" for s in range(self.num_states):\n",
" res = del_vector * psi_matrix[j, :, s] # all y_p's for certain y\n",
" backtrack_matrix[s, j] = np.argmax(res)\n",
" temp[s] = np.max(res)\n",
" del_vector = temp\n",
" y = del_vector.argmax()\n",
" # backtrack\n",
" path = []\n",
" for j in xrange(num_words - 1, -1, -1):\n",
" path.append(y)\n",
" y = backtrack_matrix[y, j]\n",
" path.reverse()\n",
" path = [self.labels[l] for l in path]\n",
" return path"
"### Load file"
"word_data = []\n",
"label_data = []\n",
"all_labels = set()\n",
"word_sets = defaultdict(set)\n",
"obsrvs = set()\n",
"for line in open('sample.txt'):\n",
" words,labels = [],[]\n",
" for token in line.strip().split():\n",
" word,label= token.split('/')\n",
" all_labels.add(label)\n",
" word_sets[label].add(word.lower())\n",
" obsrvs.add(word.lower())\n",
" words.append(word)\n",
" labels.append(label)\n",
" word_data.append(words)\n",
" label_data.append(labels)"
"### Make feature templates\n",
"START = '//'\n",
"END = '\\\\'"
"labels = list(all_labels)\n",
"lbls = [START] + labels + [END]\n",
"transition_functions = [\n",
" lambda yp,y,x_v,i,_yp=_yp,_y=_y: 1 if yp==_yp and y==_y else 0\n",
" for _yp in lbls[:-1] for _y in lbls[1:]\n",
" ]\n",
"def set_membership(tag):\n",
" def fun(yp,y,x_v,i):\n",
" if i < len(x_v) and x_v[i].lower() in word_sets[tag]:\n",
" return 1\n",
" else:\n",
" return 0\n",
" return fun\n",
"observation_functions = [set_membership(t) for t in word_sets ]\n",
"misc_functions = [\n",
" lambda yp,y,x_v,i: 1 if i < len(x_v) and re.match('^[^0-9a-zA-Z]+$',x_v[i]) else 0,\n",
" lambda yp,y,x_v,i: 1 if i < len(x_v) and re.match('^[A-Z\\.]+$',x_v[i]) else 0,\n",
" lambda yp,y,x_v,i: 1 if i < len(x_v) and re.match('^[0-9\\.]+$',x_v[i]) else 0 \n",
" ]\n",
"tagval_functions = [\n",
" lambda yp,y,x_v,i,_y=_y,_x=_x: 1 if i < len(x_v) and y==_y and x_v[i].lower() ==_x else 0\n",
" for _y in labels\n",
" for _x in obsrvs]\n",
"class FeatureSet(object):\n",
" @classmethod\n",
" def functions(cls,lbls,*arguments):\n",
" def gen():\n",
" for lbl in lbls:\n",
" for arg in arguments:\n",
" if isinstance(arg,tuple):\n",
" yield cls(lbl,*arg)\n",
" else:\n",
" yield cls(lbl,arg)\n",
" return list(gen())\n",
" def __repr__(self):\n",
" return \"%s(%s)\"%(self.__class__.__name__,self.__dict__)\n",
"class Transitions(FeatureSet):\n",
" def __init__(self,curr_lbl,prev_lbl):\n",
" self.prev_label = prev_lbl\n",
" self.label = curr_lbl\n",
" def __call__(self,yp,y,x_v,i):\n",
" if yp==self.prev_label and y==self.label:\n",
" return 1\n",
" else:\n",
" return 0"
"feature_functions = transition_functions + observation_functions + misc_functions + tagval_functions + Transitions.functions(lbls[1:], lbls[:-1])"
"crf = CRF(feature_functions, labels)"
"crf =, label_data)"
"preds = crf.predict(word_data[2])"
"We will test extent of fit because of lack of testing data."
"zip(preds, label_data[2]) # comparison of classifications vs actual"
