Skip to content

Instantly share code, notes, and snippets.

@dstein64
Last active September 2, 2016 15:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dstein64/672132657242d279c1e1cfbbecf7c467 to your computer and use it in GitHub Desktop.
Save dstein64/672132657242d279c1e1cfbbecf7c467 to your computer and use it in GitHub Desktop.
import numpy as np
import numpy.ma as ma
import theano
from theano import tensor as T
floatX = theano.config.floatX
def getmask(D):
return ma.getmaskarray(D) if ma.isMA(D) else np.zeros(D.shape, dtype=bool)
def matrix_factorization_bgd(
D, P, Q, steps=5000, alpha=0.0002, beta=0.02):
P = theano.shared(P.astype(floatX))
Q = theano.shared(Q.astype(floatX))
X = T.matrix()
error = T.sum(T.sqr(~getmask(D) * (P.dot(Q) - X)))
regularization = (beta/2.0) * (T.sum(T.sqr(P)) + T.sum(T.sqr(Q)))
cost = error + regularization
gp, gq = T.grad(cost=cost, wrt=[P, Q])
train = theano.function(inputs=[X],
outputs=cost,
updates=[(P, P - gp * alpha), (Q, Q - gq * alpha)])
for _ in xrange(steps):
train(D)
return P.get_value(), Q.get_value()
def matrix_factorization_sgd(
D, P, Q, steps=5000, alpha=0.0002, beta=0.02):
P = theano.shared(P.astype(floatX))
Q = theano.shared(Q.astype(floatX))
P_i = T.vector()
Q_j = T.vector()
i = T.iscalar()
j = T.iscalar()
x = T.scalar()
error = T.sqr(P_i.dot(Q_j) - x)
regularization = (beta/2.0) * (P_i.dot(P_i) + Q_j.dot(Q_j))
cost = error + regularization
gp, gq = T.grad(cost=cost, wrt=[P_i, Q_j])
train = theano.function(inputs=[i, j, x],
givens=[(P_i, P[i, :]), (Q_j, Q[:, j])],
updates=[(P, T.inc_subtensor(P[i, :], -gp * alpha)),
(Q, T.inc_subtensor(Q[:, j], -gq * alpha))])
for _ in xrange(steps):
for (row, col), val in np.ndenumerate(D):
if not getmask(D)[row, col]:
train(row, col, val)
return P.get_value(), Q.get_value()
def matrix_factorization_quux(
D, P, Q, steps=5000, alpha=0.0002, beta=0.02):
K = P.shape[1]
P = np.copy(P)
Q = np.copy(Q)
for step in xrange(steps):
for i in xrange(len(D)):
for j in xrange(len(D[i])):
if not getmask(D)[i, j]:
eij = D[i, j] - np.dot(P[i, :], Q[:, j])
for k in xrange(K):
P[i, k] = P[i, k] + alpha * (2 * eij * Q[k, j] - beta * P[i, k])
Q[k, j] = Q[k, j] + alpha * (2 * eij * P[i, k] - beta * Q[k, j])
return P, Q
if __name__ == '__main__':
D = np.array([[5, 3, -1, 1],
[4, -1, -1, 1],
[1, 1, -1, 5],
[1, -1, -1, 4],
[-1, 1, 5, 5]])
D = ma.masked_array(D, mask=D==-1)
m, n = D.shape
K = 2
P = np.random.rand(m, K)
Q = np.random.rand(K, n)
np.set_printoptions(formatter={'all': lambda x: str(x).rjust(2)})
print 'Ratings Matrix\n', D, '\n'
np.set_printoptions(precision = 2, formatter=None)
P_theano_bgd, Q_theano_bgd = matrix_factorization_bgd(D, P, Q)
print 'Theano Batch Gradient Descent\n',\
np.dot(P_theano_bgd, Q_theano_bgd), '\n'
P_theano_sgd, Q_theano_sgd = matrix_factorization_sgd(D, P, Q)
print 'Theano Stochastic Gradient Descent\n',\
np.dot(P_theano_sgd, Q_theano_sgd), '\n'
P_quux, Q_quux = matrix_factorization_quux(D, P, Q)
print 'quuxlabs\n', np.dot(P_quux, Q_quux), '\n'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment