Skip to content

Instantly share code, notes, and snippets.

@yamaguchiyuto
Last active April 13, 2019 10:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yamaguchiyuto/d9dbd283c54807faa9ccab532ec0d56e to your computer and use it in GitHub Desktop.
Save yamaguchiyuto/d9dbd283c54807faa9ccab532ec0d56e to your computer and use it in GitHub Desktop.
Author Topic Model
import random
import copy
import numpy as np
from scipy.sparse import lil_matrix
class ATM:
def __init__(self, K, alpha, beta, max_iter, verbose=0):
self.K=K
self.alpha = alpha
self.beta = beta
self.max_iter = max_iter
self.verbose=verbose
def fit(self,W,A,V,S):
self._W = W
self._A = A
self._D = len(W) # number of documents
self._V = V # number of vocabularies
self._S = S # number of distinct authors
self.Z = self._init_Z()
self.Y = self._init_Y()
self.nak = self._init_nak()
self.nkv = self._init_nkv()
nkv_sum = self.nkv.sum(axis=1)
nak_sum = self.nak.sum(axis=1)
self._max_score = -1
self.max_Z = None
self.max_Y = None
remained_iter = self.max_iter
while True:
if self.verbose: print remained_iter
for d in np.random.choice(self._D, self._D, replace=False):
# Sample Z and Y
for i in np.random.choice(len(self._W[d]), len(self._W[d]), replace=False):
k = self.Z[d][i] # topic
v = self._W[d][i] # word index
j = self.Y[d][i] # author_index within document d
a = self._A[d][j] # author
self.nak[a][k] -= 1
self.nkv[k][v] -= 1
nkv_sum[k] -= 1
nak_sum[a] -= 1
self.Z[d][i], self.Y[d][i] = self._sample_z_and_y(d,v,nkv_sum,nak_sum)
new_a = self._A[d][self.Y[d][i]]
new_k = self.Z[d][i]
self.nak[new_a][new_k] += 1
self.nkv[new_k][v] += 1
nkv_sum[new_k] += 1
nak_sum[new_a] += 1
s = self.score(nkv_sum,nak_sum)
if s > self._max_score:
self.max_score = s
self.max_Z = copy.copy(self.Z)
self.max_Y = copy.copy(self.Y)
remained_iter -= 1
if remained_iter <= 0: break
return self
def _init_Z(self):
Z = []
for d in range(len(self._W)):
Z.append(np.random.randint(low=0, high=self.K, size=len(self._W[d])))
return Z
def _init_Y(self):
Y = []
for d in range(len(self._W)):
Y.append(np.random.randint(low=0, high=len(self._A[d]), size=len(self._W[d])))
return Y
def _init_nak(self):
nak = np.zeros((self._S,self.K))
for d in range(self._D):
for i in range(len(self._W[d])):
k = self.Z[d][i]
j = self.Y[d][i]
a = self._A[d][j]
nak[a,k]+=1
return nak
def _init_nkv(self):
nkv = np.zeros((self.K,self._V))
for d in range(self._D):
for i in range(len(self._W[d])):
k = self.Z[d][i]
v = self._W[d][i]
nkv[k,v]+=1
return nkv
def _sample_z_and_y(self,d,v,nkv_sum,nak_sum):
nkv = self.nkv[:,v] # k-dimensional vector
na = len(self._A[d]) # number of authors in document d
prob = []
p1 = ((nkv+self.beta) / (nkv_sum+self.beta*self._V))
for j in range(na):
a = self._A[d][j]
pa = p1 * ((self.nak[a]+self.alpha) / (nak_sum[a]+self.alpha*self.K))
prob.append(pa)
prob = np.array(prob).flatten()
prob = prob/prob.sum()
zy = np.random.multinomial(n=1, pvals=prob).argmax()
z = zy%self.K
y = zy/self.K
return z,y
def score(self,nkv_sum,nak_sum):
s = 0
for d in range(self._D):
for i in range(len(self._W[d])):
v = self._W[d][i]
k = self.Z[d][i]
a = self._A[d][self.Y[d][i]]
s += ((self.nkv[k,v]+self.beta) / (nkv_sum[k]+self.beta*self._V)) * ((self.nak[a,k]+self.alpha) / (nak_sum[a]+self.alpha*self.K))
return s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment