Skip to content
{{ message }}

Instantly share code, notes, and snippets.

# mblondel/seminb.py

Created Oct 28, 2015
Semi-supervised Naive Bayes
 # -*- coding: utf-8 -*- # Copyright (C) 2010 Mathieu Blondel # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Implementation details are described at # http://www.mblondel.org/journal/2010/06/21/semi-supervised-naive-bayes-in-python/ import numpy as np """ Implementation of Naive Bayes trained by EM for semi-supervised text classification, as described in "Semi-Supervised Text Classification Using EM", by Nigam et al. Notation: w: word d: document c: class V: vocabulary size X: number of documents M: number of classes """ def softmax(loga, k=-np.inf, out=None): """ Compute the sotfmax function (normalized exponentials) without underflow. exp^a_i / \sum_j exp^a_j """ if out is None: out = np.empty_like(loga).astype(np.double) m = np.max(loga) logam = loga - m sup = logam > k inf = np.logical_not(sup) out[sup] = np.exp(logam[sup]) out[inf] = 0.0 out /= np.sum(out) return out def logsum(loga, k=-np.inf): """ Compute a sum of logs without underflow. \log \sum_c e^{b_c} = log [(\sum_c e^{b_c}) e^{-B}e^B] = log [(\sum_c e^{b_c-B}) e^B] = [log(\sum_c e^{b_c-B}) + B where B = max_c b_c """ B = np.max(loga) logaB = aB = loga - B sup = logaB > k inf = np.logical_not(sup) aB[sup] = np.exp(logaB[sup]) aB[inf] = 0.0 return (np.log(np.sum(aB)) + B) def loglikelihood(td, delta, tdu, p_w_c_log, p_c_log): V, Xl = td.shape V_, Xu = tdu.shape Xl_, M = delta.shape lik = 0.0 ## labeled # log P(x|c) p_x_c_log = np.zeros((Xl,M), np.double) for w,d in zip(*td.nonzero()): p_x_c_log[d,:] += p_w_c_log[w,:] * td[w,d] # add log P(c) + lop P(x|c) if x has label c for d,c in zip(*delta.nonzero()): lik += p_c_log[c] + p_x_c_log[d,c] ## unlabelled # log P(x|c) p_x_c_log = np.zeros((Xu,M), np.double) for w,d in zip(*tdu.nonzero()): p_x_c_log[d,:] += p_w_c_log[w,:] * tdu[w,d] # add log P(c) p_x_c_log += p_c_log[np.newaxis,:] for d in range(Xu): lik += logsum(p_x_c_log[d,:], k=-10) return lik def normalize_p_c(p_c): M = len(p_c) denom = M + np.sum(p_c) p_c += 1.0 p_c /= denom def normalize_p_w_c(p_w_c): V, X = p_w_c.shape denoms = V + np.sum(p_w_c, axis=0) p_w_c += 1.0 p_w_c /= denoms[np.newaxis,:] class SemiNB(object): def __init__(self, model=None): """ model: a model, as returned by get_model() or train(). """ self.p_w_c = None self.p_c = None if model is not None: self.set_model(model) self.debug = False def train(self, td, delta, normalize=True, sparse=True): """ td: term-document matrix V x X delta: X x M matrix where delta(d,c) = 1 if document d belongs to class c """ X_, M = delta.shape V, X = td.shape assert(X_ == X) # P(c) self.p_c = np.sum(delta, axis=0) # P(w|c) self.p_w_c = np.zeros((V,M), dtype=np.double) if sparse: # faster when delta is sparse # select indices of documents that have class c for d,c in zip(*delta.nonzero()): # select indices of terms that are non-zero for w in np.flatnonzero(td[:,d]): self.p_w_c[w,c] += td[w,d] * delta[d,c] else: # faster when delta is non-sparse for w,d in zip(*td.nonzero()): self.p_w_c[w,:] += td[w,d] * delta[d,:] if normalize: normalize_p_c(self.p_c) normalize_p_w_c(self.p_w_c) return self.get_model() def train_semi(self, td, delta, tdu, maxiter=50, eps=0.01): """ td: V x X term document matrix delta: X x M label matrix tdu: V x Xu term document matrix (unlabeled) maxiter: maximum number of iterations eps: stop if no more progress than esp """ X_, M = delta.shape V, X = td.shape assert(X_ == X) # compute counts for labeled data once for all self.train(td, delta, normalize=False) p_c_l = np.array(self.p_c, copy=True) p_w_c_l = np.array(self.p_w_c, copy=True) # normalize to get initial classifier normalize_p_c(self.p_c) normalize_p_w_c(self.p_w_c) lik = loglikelihood(td, delta, tdu, np.log(self.p_w_c), np.log(self.p_c)) for iteration in range(1, maxiter+1): # E-step: find the probabilistic labels for unlabeled data delta_u = self.predict_proba_all(tdu) # M-step: train classifier with the union of # labeled and unlabeled data self.train(tdu, delta_u, normalize=False, sparse=False) self.p_c += p_c_l self.p_w_c += p_w_c_l normalize_p_c(self.p_c) normalize_p_w_c(self.p_w_c) lik_new = loglikelihood(td, delta, tdu, np.log(self.p_w_c), np.log(self.p_c)) lik_diff = lik_new - lik assert(lik_diff >= -1e-10) lik = lik_new if lik_diff < eps: print "No more progress, stopping EM at iteration", iteration break if self.debug: print "Iteration", iteration print "L += %f" % lik_diff return self.get_model() def p_x_c_log_all(self, td): M = len(self.p_c) V, X = td.shape p_x_c_log = np.zeros((X,M), np.double) p_w_c_log = np.log(self.p_w_c) # log P(x|c) for w,d in zip(*td.nonzero()): p_x_c_log[d,:] += p_w_c_log[w,:] * td[w,d] return p_x_c_log def predict_proba(self, x): """ x: a V array representing a document Compute a M array containing P(c|x). """ return self.predict_proba_all(x[:,np.newaxis])[0,:] def predict_proba_all(self, td): """ td: V x X term document matrix Compute an X x M matrix of P(c|x) for all x in td. """ V, X = td.shape # log P(x|c) p_x_c_log = self.p_x_c_log_all(td) # add log P(c) p_x_c_log += np.log(self.p_c)[np.newaxis,:] # sotfmax(log P(x|c) + log P(c)) = P(c|x) for d in range(X): softmax(p_x_c_log[d,:], k=-10, out=p_x_c_log[d,:]) return p_x_c_log def predict(self, x): """ x: a V array representing a document Compute the predicted class index. """ return self.predict_all(x[:,np.newaxis]) def predict_all(self, td): """ td: V x X term document matrix Compute a X array containing predicted class indices. Note: the main difference with predict_proba_all is that the normalization is not necessary, as we are only interested in the most likely class. """ # log P(x|c) p_x_c_log = self.p_x_c_log_all(td) # add log P(c) p_x_c_log += np.log(self.p_c)[np.newaxis,:] return p_x_c_log.argmax(axis=1) def get_model(self): return (self.p_w_c, self.p_c) def set_model(self, model): self.p_w_c, self.p_c = model

### jinyyy666 commented Apr 4, 2017 • edited

 Nice code! In Line:113, should it be "V,M = p_w_c.shape" instead of "V, X = p_w_c.shape" for the sake of clarity although the "X" is not used here? : )

### amitsasane007 commented May 10, 2017

 Please add description to code

### YashviRajvir commented Mar 6, 2019

 The implementation details page specified is not available.Please help with it.
to join this conversation on GitHub. Already have an account? Sign in to comment