Instantly share code, notes, and snippets.

# yamaguchiyuto/nctm.py Created Mar 22, 2017

What would you like to do?
Noisy Correspondence Topic Model
 import random import numpy as np from scipy.sparse import lil_matrix class NCTM: def __init__(self, K, alpha, beta, gamma, eta, max_iter, verbose=0): self.K=K self.alpha = alpha self.beta = beta self.gamma = gamma self.eta = eta self.max_iter = max_iter self.verbose=verbose def fit(self,W,X,Vw,Vx): self._W = W self._X = X self._D = len(W) self._Vw = Vw # number of vocabularies self._Vx = Vx # number of vocabularies self.Z = self._init_Z() self.Y = self._init_Y() self.R = self._init_R() self.ndk = self._init_ndk() self.mdk = self._init_mdk() self.nkw = self._init_nkw() # for W self.nkx = self._init_nkx() # for x self.m0x = self._init_m0x() # number of noise assignment for x nkw_sum = self.nkw.sum(axis=1) nkx_sum = self.nkx.sum(axis=1) m0x_sum = self.m0x.sum() m1x_sum = nkx_sum.sum() remained_iter = self.max_iter while True: if self.verbose: print remained_iter for d in np.random.choice(self._D, self._D, replace=False): # Sample Z for i in np.random.choice(len(self._W[d]), len(self._W[d]), replace=False): k = self.Z[d][i] v = self._W[d][i] self.ndk[d][k] -= 1 self.nkw[k][v] -= 1 nkw_sum[k] -= 1 self.Z[d][i] = self._sample_z(d,k,v,nkw_sum) self.ndk[d][self.Z[d][i]] += 1 self.nkw[self.Z[d][i]][v] += 1 nkw_sum[self.Z[d][i]] += 1 for d in np.random.choice(self._D, self._D, replace=False): # Sample Y for i in np.random.choice(len(self._X[d]), len(self._X[d]), replace=False): if self.R[d][i]==1: k = self.Y[d][i] u = self._X[d][i] self.mdk[d][k] -= 1 self.nkx[k][u] -= 1 nkx_sum[k] -= 1 self.Y[d][i] = self._sample_y(d,u,nkx_sum) self.mdk[d][self.Y[d][i]] += 1 self.nkx[self.Y[d][i]][u] += 1 nkx_sum[self.Y[d][i]] += 1 for d in np.random.choice(self._D, self._D, replace=False): # Sample R for i in np.random.choice(len(self._X[d]), len(self._X[d]), replace=False): r = self.R[d][i] k = self.Y[d][i] u = self._X[d][i] if r==0: self.m0x[u] -= 1 m0x_sum -= 1 else: self.mdk[d][k] -= 1 self.nkx[k][u] -= 1 nkx_sum[k] -= 1 m1x_sum -= 1 self.R[d][i] = self._sample_r(u,k,nkx_sum,m0x_sum,m1x_sum) if self.R[d][i]==0: self.m0x[u] += 1 m0x_sum += 1 else: self.mdk[d][k] += 1 self.nkx[k][u] += 1 nkx_sum[k] += 1 m1x_sum += 1 remained_iter -= 1 if remained_iter <= 0: break return self def _init_Z(self): Z = [] for d in range(len(self._W)): Z.append(np.random.randint(low=0,high=self.K,size=len(self._W[d]))) return Z def _init_Y(self): Y = [] for d in range(len(self._X)): Y.append(np.random.choice(self.Z[d],size=len(self._X[d]))) return Y def _init_R(self): R = [] for d in range(len(self._X)): R.append(np.random.choice([0,1],size=len(self._X[d]))) return R def _init_ndk(self): ndk = np.zeros((self._D,self.K)) for d in range(self._D): for i in range(len(self._W[d])): k = self.Z[d][i] ndk[d,k]+=1 return ndk def _init_mdk(self): mdk = np.zeros((self._D,self.K)) for d in range(self._D): for i in range(len(self._X[d])): if self.R[d][i]==1: k = self.Y[d][i] mdk[d,k]+=1 return mdk def _init_nkw(self): nkw = np.zeros((self.K,self._Vw)) for d in range(self._D): for i in range(len(self._W[d])): k = self.Z[d][i] v = self._W[d][i] nkw[k,v]+=1 return nkw def _init_nkx(self): nkx = np.zeros((self.K,self._Vx)) for d in range(self._D): for i in range(len(self._X[d])): if self.R[d][i]==1: k = self.Y[d][i] u = self._X[d][i] nkx[k,u]+=1 return nkx def _init_m0x(self): m0x = np.zeros(self._Vx) for d in range(self._D): for i in range(len(self._X[d])): r = self.R[d][i] u = self._X[d][i] if r==0: m0x[u]+=1 return m0x def _sample_z(self,d,old_k,v,nkw_sum): nkw = self.nkw[:,v] # k-dimensional vector if self.ndk[d,old_k]==0 and self.mdk[d,old_k]>0: return old_k else: ndk = self.ndk[d].copy() ndk[ndk==0] += 1 prob = (self.ndk[d]+self.alpha) * ((nkw+self.beta)/(nkw_sum+self.beta*self._Vw)) * ((ndk+1)/ndk)**self.mdk[d] prob = prob/prob.sum() z = np.random.multinomial(n=1, pvals=prob).argmax() return z def _sample_y(self,d,u,nkx_sum): nkx = self.nkx[:,u] # k-dimensional vector prob = self.ndk[d] * ((nkx+self.gamma)/(nkx_sum+self.gamma*self._Vx)) prob = prob/prob.sum() y = np.random.multinomial(n=1, pvals=prob).argmax() return y def _sample_r(self,u,k,nkx_sum,m0x_sum,m1x_sum): nkx = self.nkx[k,u] p0 = (m0x_sum+self.eta) * ((self.m0x[u]+self.gamma)/(m0x_sum+self.gamma*self._Vx)) p1 = (m1x_sum+self.eta) * ((nkx+self.gamma)/(nkx_sum[k]+self.gamma*self._Vx)) p = p1/(p1+p0) r = np.random.multinomial(n=1, pvals=[1-p,p]).argmax() return r