Skip to content

Instantly share code, notes, and snippets.

@kalaidin
Created May 28, 2015 22:11
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save kalaidin/9ea737ad771fcf073e57 to your computer and use it in GitHub Desktop.
Save kalaidin/9ea737ad771fcf073e57 to your computer and use it in GitHub Desktop.
Logistic regression + Factorization machines + SGD
import numpy as np
from math import exp, log
"""
SGD for logistic loss + factorization machines
The code follows this paper:
[1] http://www.ics.uci.edu/~smyth/courses/cs277/papers/factorization_machines_with_libFM.pdf
"""
def sigmoid(x):
if x > 500:
return 1
elif x < -500:
return 0
return 1 / (1 + exp(-x))
def log_loss(y, p):
z = p * y
if z > 18:
return exp(-z)
if z < -18:
return -z
return -log(sigmoid(z))
class FM(object):
def __init__(self, etha=0.01, l=0.01, sigma=0.001, n_iter=1000, k=3):
# TODO: default parameters
self.etha = etha
self.l = l
self.sigma = sigma
self.n_iter = n_iter
self.k = k
self.w0 = 0
self.n = 0
self.p = 0
self.w = None
self.v = None
def fit(self, X, Y, verbose=True):
self.n, self.p = X.shape
self.w = np.zeros(self.p)
self.v = self.sigma * np.array(np.random.randn(self.p, self.k))
for epoch in range(self.n_iter):
for x, y in zip(X, Y):
p = self._predict(x)
delta = y * (sigmoid(y * p) - 1.0)
self.w0 -= self.etha * (delta + 2 * self.l * self.w0)
for i in range(self.p):
self.w[i] -= self.etha *\
(delta * x[i] + 2 * self.l * self.w[i])
for j in range(self.k):
h = x[i] *\
(np.dot(self.v[:, j], x) - x[i] * self.v[i, j])
self.v[i, j] -= self.etha *\
(delta * h + 2 * self.l * self.v[i, j])
if verbose:
print(self.test(X, Y))
return self
def _predict(self, x):
res = self.w0 + np.dot(self.w, x)
f_res = 0.0
# [1] equation 5
for f in range(self.k):
s = 0.0
s_squared = 0.0
for j in range(self.p):
el = self.v[j, f] * x[j]
s += el
s_squared += el ** 2
f_res += 0.5 * (s ** 2 - s_squared)
res += f_res
return res
def predict(self, X):
return np.array([self._predict(x) for x in X])
def scale(self, Y):
return np.array([sigmoid(y) for y in Y])
def test(self, X, Y):
return np.mean([log_loss(p, y) for p, y in zip(self.predict(X), Y)])
def main():
fm = FM()
X_train = np.array([[2., 1., 1., 0., 1., 1., 0., 0., 0.],
[2., 1., 1., 1., 0., 0., 1., 0., 0.],
[2., 1., 1., 0., 0., 0., 0., 1., 0.],
[2., 1., 0., 0., 0., 0., 0., 0., 1.],
[4., 2., 0., 0., 0., 0., 0., 0., 1.],
[4., 2., 0., 0., 0., 0., 0., 0., 1.],
[4., 2., 0., 0., 0., 0., 0., 0., 1.]])
Y_train = np.array([1, 1, -1, -1, 1, 1, 1])
fm.fit(X_train, Y_train)
print(fm.scale(fm.predict(X_train)))
if __name__ == "__main__":
main()
@mfagerlund
Copy link

Very nice, thanks - I love minimal implementations that allows me to verify that I'm on the right track creating a larger implementation. I created a C# version of this code which you can find here: https://gist.github.com/mfagerlund/30218d5a07b7fa34e1719688698a5014 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment