Instantly share code, notes, and snippets.

# yamaguchiyuto/atm.py Created Mar 23, 2017

What would you like to do?
Author Topic Model
 import random import copy import numpy as np from scipy.sparse import lil_matrix class ATM: def __init__(self, K, alpha, beta, max_iter, verbose=0): self.K=K self.alpha = alpha self.beta = beta self.max_iter = max_iter self.verbose=verbose def fit(self,W,A,V,S): self._W = W self._A = A self._D = len(W) # number of documents self._V = V # number of vocabularies self._S = S # number of distinct authors self.Z = self._init_Z() self.Y = self._init_Y() self.nak = self._init_nak() self.nkv = self._init_nkv() nkv_sum = self.nkv.sum(axis=1) nak_sum = self.nak.sum(axis=1) self._max_score = -1 self.max_Z = None self.max_Y = None remained_iter = self.max_iter while True: if self.verbose: print remained_iter for d in np.random.choice(self._D, self._D, replace=False): # Sample Z and Y for i in np.random.choice(len(self._W[d]), len(self._W[d]), replace=False): k = self.Z[d][i] # topic v = self._W[d][i] # word index j = self.Y[d][i] # author_index within document d a = self._A[d][j] # author self.nak[a][k] -= 1 self.nkv[k][v] -= 1 nkv_sum[k] -= 1 nak_sum[a] -= 1 self.Z[d][i], self.Y[d][i] = self._sample_z_and_y(d,v,nkv_sum,nak_sum) new_a = self._A[d][self.Y[d][i]] new_k = self.Z[d][i] self.nak[new_a][new_k] += 1 self.nkv[new_k][v] += 1 nkv_sum[new_k] += 1 nak_sum[new_a] += 1 s = self.score(nkv_sum,nak_sum) if s > self._max_score: self.max_score = s self.max_Z = copy.copy(self.Z) self.max_Y = copy.copy(self.Y) remained_iter -= 1 if remained_iter <= 0: break return self def _init_Z(self): Z = [] for d in range(len(self._W)): Z.append(np.random.randint(low=0, high=self.K, size=len(self._W[d]))) return Z def _init_Y(self): Y = [] for d in range(len(self._W)): Y.append(np.random.randint(low=0, high=len(self._A[d]), size=len(self._W[d]))) return Y def _init_nak(self): nak = np.zeros((self._S,self.K)) for d in range(self._D): for i in range(len(self._W[d])): k = self.Z[d][i] j = self.Y[d][i] a = self._A[d][j] nak[a,k]+=1 return nak def _init_nkv(self): nkv = np.zeros((self.K,self._V)) for d in range(self._D): for i in range(len(self._W[d])): k = self.Z[d][i] v = self._W[d][i] nkv[k,v]+=1 return nkv def _sample_z_and_y(self,d,v,nkv_sum,nak_sum): nkv = self.nkv[:,v] # k-dimensional vector na = len(self._A[d]) # number of authors in document d prob = [] p1 = ((nkv+self.beta) / (nkv_sum+self.beta*self._V)) for j in range(na): a = self._A[d][j] pa = p1 * ((self.nak[a]+self.alpha) / (nak_sum[a]+self.alpha*self.K)) prob.append(pa) prob = np.array(prob).flatten() prob = prob/prob.sum() zy = np.random.multinomial(n=1, pvals=prob).argmax() z = zy%self.K y = zy/self.K return z,y def score(self,nkv_sum,nak_sum): s = 0 for d in range(self._D): for i in range(len(self._W[d])): v = self._W[d][i] k = self.Z[d][i] a = self._A[d][self.Y[d][i]] s += ((self.nkv[k,v]+self.beta) / (nkv_sum[k]+self.beta*self._V)) * ((self.nak[a,k]+self.alpha) / (nak_sum[a]+self.alpha*self.K)) return s
to join this conversation on GitHub. Already have an account? Sign in to comment