Skip to content

Instantly share code, notes, and snippets.

@yamaguchiyuto
Created March 21, 2017 07:27
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yamaguchiyuto/c5f73775e5f394037a5e0c90151ba920 to your computer and use it in GitHub Desktop.
Save yamaguchiyuto/c5f73775e5f394037a5e0c90151ba920 to your computer and use it in GitHub Desktop.
Correspondence Topic Model
import random
import numpy as np
from scipy.sparse import lil_matrix
class CTM:
def __init__(self, K, alpha, beta, gamma, max_iter, verbose=0):
self.K=K
self.alpha = alpha
self.beta = beta
self.gamma = gamma
self.max_iter = max_iter
self.verbose=verbose
def fit(self,W,X,Vw,Vx):
self._W = W
self._X = X
self._D = len(W)
self._Vw = Vw # number of vocabularies
self._Vx = Vx # number of vocabularies
self.Z = self._init_Z()
self.Y = self._init_Y()
self.ndk = self._init_ndk()
self.mdk = self._init_mdk()
self.nkw = self._init_nkw() # for W
self.nkx = self._init_nkx() # for x
nkw_sum = self.nkw.sum(axis=1)
nkx_sum = self.nkx.sum(axis=1)
remained_iter = self.max_iter
while True:
if self.verbose: print remained_iter
for d in np.random.choice(self._D, self._D, replace=False):
# Sample Z
for i in np.random.choice(len(self._W[d]), len(self._W[d]), replace=False):
k = self.Z[d][i]
v = self._W[d][i]
self.ndk[d][k] -= 1
self.nkw[k][v] -= 1
nkw_sum[k] -= 1
self.Z[d][i] = self._sample_z(d,k,v,nkw_sum)
self.ndk[d][self.Z[d][i]] += 1
self.nkw[self.Z[d][i]][v] += 1
nkw_sum[self.Z[d][i]] += 1
# Sample Y
for i in np.random.choice(len(self._X[d]), len(self._X[d]), replace=False):
k = self.Y[d][i]
u = self._X[d][i]
self.mdk[d][k] -= 1
self.nkx[k][u] -= 1
nkx_sum[k] -= 1
self.Y[d][i] = self._sample_y(d,u,nkx_sum)
self.mdk[d][self.Y[d][i]] += 1
self.nkx[self.Y[d][i]][u] += 1
nkx_sum[self.Y[d][i]] += 1
remained_iter -= 1
if remained_iter <= 0: break
return self
def _init_Z(self):
Z = []
for d in range(len(self._W)):
Z.append(np.random.randint(low=0,high=self.K,size=len(self._W[d])))
return Z
def _init_Y(self):
Y = []
for d in range(len(self._X)):
Y.append(np.random.choice(self.Z[d],size=len(self._X[d])))
return Y
def _init_ndk(self):
ndk = np.zeros((self._D,self.K)) + self.alpha
for d in range(self._D):
for i in range(len(self._W[d])):
k = self.Z[d][i]
ndk[d,k]+=1
return ndk
def _init_mdk(self):
mdk = np.zeros((self._D,self.K))
for d in range(self._D):
for i in range(len(self._X[d])):
k = self.Y[d][i]
mdk[d,k]+=1
return mdk
def _init_nkw(self):
nkw = np.zeros((self.K,self._Vw)) + self.beta
for d in range(self._D):
for i in range(len(self._W[d])):
k = self.Z[d][i]
v = self._W[d][i]
nkw[k,v]+=1
return nkw
def _init_nkx(self):
nkx = np.zeros((self.K,self._Vx)) + self.gamma
for d in range(self._D):
for i in range(len(self._X[d])):
k = self.Y[d][i]
u = self._X[d][i]
nkx[k,u]+=1
return nkx
def _sample_z(self,d,old_k,v,nkw_sum):
nkw = self.nkw[:,v] # k-dimensional vector
if self.ndk[d,old_k]==0:
if self.mdk[d,old_k]>0:
return old_k
else:
prob = self.ndk[d] * (nkw/nkw_sum) * ((self.ndk[d]+1)/self.ndk[d])**self.mdk[d]
prob = prob/prob.sum()
z = np.random.multinomial(n=1, pvals=prob).argmax()
return z
def _sample_y(self,d,u,nkx_sum):
nkx = self.nkx[:,u] # k-dimensional vector
prob = (self.ndk[d]-self.alpha) * (nkx/nkx_sum)
prob = prob/prob.sum()
y = np.random.multinomial(n=1, pvals=prob).argmax()
return y
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment