Instantly share code, notes, and snippets.

# yamaguchiyuto/ctm.py Created Mar 21, 2017

What would you like to do?
Correspondence Topic Model
 import random import numpy as np from scipy.sparse import lil_matrix class CTM: def __init__(self, K, alpha, beta, gamma, max_iter, verbose=0): self.K=K self.alpha = alpha self.beta = beta self.gamma = gamma self.max_iter = max_iter self.verbose=verbose def fit(self,W,X,Vw,Vx): self._W = W self._X = X self._D = len(W) self._Vw = Vw # number of vocabularies self._Vx = Vx # number of vocabularies self.Z = self._init_Z() self.Y = self._init_Y() self.ndk = self._init_ndk() self.mdk = self._init_mdk() self.nkw = self._init_nkw() # for W self.nkx = self._init_nkx() # for x nkw_sum = self.nkw.sum(axis=1) nkx_sum = self.nkx.sum(axis=1) remained_iter = self.max_iter while True: if self.verbose: print remained_iter for d in np.random.choice(self._D, self._D, replace=False): # Sample Z for i in np.random.choice(len(self._W[d]), len(self._W[d]), replace=False): k = self.Z[d][i] v = self._W[d][i] self.ndk[d][k] -= 1 self.nkw[k][v] -= 1 nkw_sum[k] -= 1 self.Z[d][i] = self._sample_z(d,k,v,nkw_sum) self.ndk[d][self.Z[d][i]] += 1 self.nkw[self.Z[d][i]][v] += 1 nkw_sum[self.Z[d][i]] += 1 # Sample Y for i in np.random.choice(len(self._X[d]), len(self._X[d]), replace=False): k = self.Y[d][i] u = self._X[d][i] self.mdk[d][k] -= 1 self.nkx[k][u] -= 1 nkx_sum[k] -= 1 self.Y[d][i] = self._sample_y(d,u,nkx_sum) self.mdk[d][self.Y[d][i]] += 1 self.nkx[self.Y[d][i]][u] += 1 nkx_sum[self.Y[d][i]] += 1 remained_iter -= 1 if remained_iter <= 0: break return self def _init_Z(self): Z = [] for d in range(len(self._W)): Z.append(np.random.randint(low=0,high=self.K,size=len(self._W[d]))) return Z def _init_Y(self): Y = [] for d in range(len(self._X)): Y.append(np.random.choice(self.Z[d],size=len(self._X[d]))) return Y def _init_ndk(self): ndk = np.zeros((self._D,self.K)) + self.alpha for d in range(self._D): for i in range(len(self._W[d])): k = self.Z[d][i] ndk[d,k]+=1 return ndk def _init_mdk(self): mdk = np.zeros((self._D,self.K)) for d in range(self._D): for i in range(len(self._X[d])): k = self.Y[d][i] mdk[d,k]+=1 return mdk def _init_nkw(self): nkw = np.zeros((self.K,self._Vw)) + self.beta for d in range(self._D): for i in range(len(self._W[d])): k = self.Z[d][i] v = self._W[d][i] nkw[k,v]+=1 return nkw def _init_nkx(self): nkx = np.zeros((self.K,self._Vx)) + self.gamma for d in range(self._D): for i in range(len(self._X[d])): k = self.Y[d][i] u = self._X[d][i] nkx[k,u]+=1 return nkx def _sample_z(self,d,old_k,v,nkw_sum): nkw = self.nkw[:,v] # k-dimensional vector if self.ndk[d,old_k]==0: if self.mdk[d,old_k]>0: return old_k else: prob = self.ndk[d] * (nkw/nkw_sum) * ((self.ndk[d]+1)/self.ndk[d])**self.mdk[d] prob = prob/prob.sum() z = np.random.multinomial(n=1, pvals=prob).argmax() return z def _sample_y(self,d,u,nkx_sum): nkx = self.nkx[:,u] # k-dimensional vector prob = (self.ndk[d]-self.alpha) * (nkx/nkx_sum) prob = prob/prob.sum() y = np.random.multinomial(n=1, pvals=prob).argmax() return y