Skip to content

Instantly share code, notes, and snippets.

@yamaguchiyuto
Created March 16, 2017 14:27
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save yamaguchiyuto/5c3c90dbbc13166139d6eedd0375eae0 to your computer and use it in GitHub Desktop.
Infinite Latent Feature Model
import numpy as np
import scipy.sparse
class ILFM:
def __init__(self, var_x=1.0, var_y=1.0, alpha=1.0, max_iter=30, max_sampled_new_features=10):
self.var_x=var_x
self.var_y=var_y
self.alpha=alpha
self.max_iter=max_iter
self.max_sampled_new_features=max_sampled_new_features
def _log_Gaussian_pdf(self,x,mu,var):
return -np.linalg.norm(x-mu)**2/(2*var) # not normalized
def _log_Poisson_pdf(self,n,lamb):
return n*np.log(lamb) - np.log(np.arange(n)+1).sum()
def _logprob_to_normalized_prob(self,logprob):
logprob -= max(logprob)
prob = np.exp(logprob)
prob /= prob.sum()
return prob
def _calc_prob_to_sample_z(self,i,k): # avoid underflow
logprob = []
mu = self.Z[i,:].dot(self.X)[0]
logp0 = self._log_Gaussian_pdf(self._Y[i],mu,self.var_y) + np.log(1-(self.m[k]/float(self._N)))
logprob.append(logp0)
mu += self.X[k,:]
logp1 = self._log_Gaussian_pdf(self._Y[i],mu,self.var_y) + np.log(self.m[k]/float(self._N))
logprob.append(logp1)
logprob = np.array(logprob)
return self._logprob_to_normalized_prob(logprob)
def _calc_prob_to_sample_n_new_features(self,i,new_mus): # avoid underflow
logprob = []
mu = self.Z[i,:].dot(self.X)[0]
lamb = self.alpha / float(self._N)
for n in range(self.max_sampled_new_features+1):
if n>0: mu += new_mus[n-1]
logp = self._log_Gaussian_pdf(self._Y[i], mu, self.var_y) + self._log_Poisson_pdf(n,lamb)
logprob.append(logp)
logprob = np.array(logprob)
return self._logprob_to_normalized_prob(logprob)
def _add_new_features(self,i,n,new_mus):
for j in range(n):
if len(self.new_features) > 0:
nf = self.new_features.pop()
self.X[nf] = new_mus[j]
self.sampled_features[nf]=True
else:
nf = self.Z.shape[1]
self.X = np.vstack([self.X, new_mus[j]])
self.Z = scipy.sparse.hstack([self.Z, scipy.sparse.lil_matrix((self._N,1))], 'lil')
self.sampled_features = np.hstack([self.sampled_features, True])
self.Z[i,nf] = 1
self.m[nf] = 1
def _sample_Z(self):
for i in range(self._N):
for k in range(self.Z.shape[1]):
if self.m[k]==0: continue
self.m[k]-=self.Z[i,k]
self.Z[i,k]=0
if self.m[k]==0:
self.new_features.append(k)
self.sampled_features[k]=False
else:
prob = self._calc_prob_to_sample_z(i,k)
zik = np.random.multinomial(1, prob, size=1).argmax()
self.Z[i,k] = zik
self.m[k] += zik
new_mus = [] # means for new features
for _ in range(self.max_sampled_new_features):
new_mus.append(self._init_x())
prob = self._calc_prob_to_sample_n_new_features(i,new_mus)
n_sampled_new_features = np.random.multinomial(1, prob, size=1).argmax()
self._add_new_features(i,n_sampled_new_features,new_mus)
def _sample_X(self):
if self.sampled_features.sum()>0:
Z = self.Z[:,self.sampled_features]
Vx_inv = Z.T.dot(Z).todense() + (self.var_y/self.var_x)*np.identity(Z.shape[1])
Vx = np.linalg.inv(Vx_inv)
Vxy = self.var_y * Vx
Mu = Vx.dot(Z.T.dot(self._Y)).A
for d in range(self._D):
xd = np.random.multivariate_normal(Mu[:,d],Vxy)
self.X[self.sampled_features,d] = xd
def _init_x(self):
return np.random.normal(loc=0, scale=np.sqrt(self.var_x), size=self._D)
def fit(self,Y):
self._Y = Y
self._N = Y.shape[0]
self._D = Y.shape[1]
self.new_features = [0]
self.sampled_features = np.array([False]) # True if there is at least one 1 for each feature
self.m = {self.new_features[0]:0} # number of 1s for each feature
self.Z = scipy.sparse.lil_matrix((self._N,1))
self.X = np.zeros(shape=(1,self._D))
remained_iter=self.max_iter
while True:
self._sample_Z()
self._sample_X()
if remained_iter<=0: break
remained_iter-=1
return self
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment