from __future__ import print_function
import numpy as np
from spark_config import sc
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from util import RDD_check, LabeledPoint_check
from sklearn.base import BaseEstimator
def perceptron_loss(y, y_pred):
return max(0., -y*y_pred)
def perceptron_grad(y, y_pred):
return -y if -y*y_pred > 0. else 0.
def hinge_loss(y, y_pred):
return max(0., 1. - y*y_pred)
def hinge_grad(y, y_pred):
return -y if -y*y_pred > -1. else 0.
def logit_loss(y, y_pred):
return np.log(1. + np.exp(-y*y_pred))
def logit_grad(y, y_pred):
return -y*(1.-sigmoid(y*y_pred))
def sigmoid(x):
return 1. / (1. + np.exp(-x))
def sign(x):
return 1. if x >= 0. else -1.
def penalty_grad(penalty, weights):
if penalty == 'l2':
return weights
elif penalty == 'l1':
return np.sign(weights)
else:
return Vectors.zeros(size=weights.size)
def init_param(shape):
s = np.sqrt(2. / shape[0])
return np.random.normal(loc=0.0, scale=s, size=shape)
def gradient_descent(features, label, weights, loss_func, grad_func, penalty, alpha):
dweights = grad_func(label, features.dot(weights))*features
dweights = dweights + alpha*np.append(penalty_grad(penalty, weights[:-1]), 0)
loss = loss_func(label, features.dot(weights))
return (dweights, loss)
class BaseSGDClassifier(BaseEstimator):
def __init__(self, loss="hinge", n_epoch=5, learning_rate=1., penalty='l2', alpha=0.01, verbose=1):
self.loss = loss
self.n_epoch = n_epoch
self.learning_rate = learning_rate
self.penalty = penalty
self.alpha = alpha
self.weights = None
self.intercept = None
self.verbose = verbose
self.k = k
self.class_weights = class_weights
def train_data_transform(self, train_data):
if not LabeledPoint_check(train_data):
print('train data must be wrapped by LabeledPoint')
exit(1)
return train_data.map(lambda p: LabeledPoint(p.label, np.append(p.features, 1.)))
def fit(self, train_data):
sample_len = train_data.count()
feature_len = len(train_data.first().features)
augmented_data = self.train_data_transform(train_data).cache()
if self.weights is None or self.intercept is None:
weights = Vectors.dense(np.append(init_param(shape=(feature_len, 1)), 0.))
else:
weights = Vectors.dense(np.append(self.weights, self.intercept))
grad_func = globals()[self.loss+'_grad']
loss_func = globals()[self.loss+'_loss']
penalty = self.penalty
alpha = self.alpha
for i in range(self.n_epoch):
print('...Epoch: ', i)
bcweights = sc.broadcast(weights)
d_weights, loss = augmented_data.map(lambda p: gradient_descent(features=p.features, label=p.label,
learning_rate=self.learning_rate, weights=bcweights.value, loss_func=loss_func,
grad_func=grad_func, penalty=penalty, alpha=alpha))\
.reduce(lambda x, y: (x[0]+y[0], x[1]+y[1]))
weights = weights - self.learning_rate * (d_weights / sample_len) *np.sqrt(1./(i+1))
if self.verbose:
print('loss: %f' % (loss/sample_len))
self.weights, self.intercept = weights.toArray()[:-1], weights.toArray()[-1]
def predict(self, X):
weights = self.weights
intercept = self.intercept
def _predict(x):
if isinstance(x, LabeledPoint):
return sign(weights.dot(x.features) + intercept)
elif RDD_check(x):
return None
else:
return sign(weights.dot(x) + intercept)
if RDD_check(X):
return X.map(_predict)
else:
return _predict(X)
def score(self, X_y):
if not LabeledPoint_check(X_y):
print('test data must be wrapped by LabeledPoint')
exit(1)
weights = self.weights
intercept = self.intercept
tp = X_y.map(lambda p: 1. if p.label == sign(weights.dot(p.features) + intercept) else 0.)\
.reduce(lambda x, y: x+y)
return tp / X_y.count()
def transform(self, X):
weights = self.weights
intercept = self.intercept
def _transform(x):
if isinstance(x, LabeledPoint):
return LabeledPoint(x.label, [sign(weights.dot(x.features) + intercept)])
elif RDD_check(x):
return None
else:
return Vectors.dense([sign(weights.dot(x) + intercept)])
if RDD_check(X):
return X.map(_transform)
else:
return _transform(X)
class Perceptron(BaseSGDClassifier):
def __init__(self, n_epoch=5, learning_rate=1., penalty='l2', alpha=0.01, verbose=1):
super(Perceptron, self).__init__(loss='perceptron',
n_epoch=n_epoch,
learning_rate=learning_rate,
penalty=penalty,
alpha=alpha,
verbose=verbose)
class SVC(BaseSGDClassifier):
def __init__(self, n_epoch=5, learning_rate=1., penalty='l2', alpha=0.01, verbose=1):
super(SVC, self).__init__(loss='hinge',
n_epoch=n_epoch,
learning_rate=learning_rate,
penalty=penalty,
alpha=alpha,
verbose=verbose)
class LogisticRegression(BaseSGDClassifier):
def __init__(self, n_epoch=5, learning_rate=1., penalty='l2', alpha=0.01, verbose=1):
super(LogisticRegression, self).__init__(loss='logit',
n_epoch=n_epoch,
learning_rate=learning_rate,
penalty=penalty,
alpha=alpha,
verbose=verbose)
def predict_prob(self, X):
weights = self.weights
intercept = self.intercept
def _predict(x):
if isinstance(x, LabeledPoint):
return sigmoid(weights.dot(x.features) + intercept)
elif RDD_check(x):
return None
else:
return sigmoid(weights.dot(x) + intercept)
if RDD_check(X):
return X.map(_predict)
else:
return _predict(X)
def transform(self, X, prob=True):
weights = self.weights
intercept = self.intercept
def _transform(x):
if isinstance(x, LabeledPoint):
if prob:
return LabeledPoint(x.label, [sigmoid(weights.dot(x.features) + intercept)])
else:
return LabeledPoint(x.label, [sign(weights.dot(x.features) + intercept)])
elif RDD_check(x):
return None
else:
if prob:
return Vectors.dense([sigmoid(weights.dot(x) + intercept)])
else:
return Vectors.dense([sign(weights.dot(x) + intercept)])
if RDD_check(X):
return X.map(_transform)
else:
return _transform(X)