Skip to content

Instantly share code, notes, and snippets.

@jiumem
Last active April 22, 2016 08:03
Show Gist options
  • Save jiumem/a99beb7aa276c98dafa7aceb9e00ec57 to your computer and use it in GitHub Desktop.
Save jiumem/a99beb7aa276c98dafa7aceb9e00ec57 to your computer and use it in GitHub Desktop.

basic import

from __future__ import print_function
import numpy as np
from spark_config import sc
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from util import RDD_check, LabeledPoint_check
from sklearn.base import BaseEstimator

define loss function and gradient function

def perceptron_loss(y, y_pred):
    return max(0., -y*y_pred)


def perceptron_grad(y, y_pred):
    return -y if -y*y_pred > 0. else 0.


def hinge_loss(y, y_pred):
    return max(0., 1. - y*y_pred)


def hinge_grad(y, y_pred):
    return -y if -y*y_pred > -1. else 0.


def logit_loss(y, y_pred):
    return np.log(1. + np.exp(-y*y_pred))


def logit_grad(y, y_pred):
    return -y*(1.-sigmoid(y*y_pred))

penalty gradient function, weights initial function

def sigmoid(x):
    return 1. / (1. + np.exp(-x))


def sign(x):
    return 1. if x >= 0. else -1.


def penalty_grad(penalty, weights):
    if penalty == 'l2':
        return weights
    elif penalty == 'l1':
        return np.sign(weights)
    else:
        return Vectors.zeros(size=weights.size)


def init_param(shape):
    s = np.sqrt(2. / shape[0])
    return np.random.normal(loc=0.0, scale=s, size=shape)

gradient desent

def gradient_descent(features, label, weights, loss_func, grad_func, k, learning_rate, penalty, alpha, class_weights):

    if class_weights is not None:
        class_weight = class_weights[label]
    else:
        class_weight = 1.
    for i in range(k):
        dweights = grad_func(label, features.dot(weights))*features
        dweights = dweights + alpha*np.append(penalty_grad(penalty, weights[:-1]), 0)
        weights = weights - learning_rate*class_weight*dweights

    loss = class_weight*loss_func(label, features.dot(weights))
    return (weights, loss)

Base SGD classifier

class BaseSGDClassifier(BaseEstimator):
    def __init__(self, loss="hinge", n_epoch=5, learning_rate=1., 
        penalty='l2', alpha=0.01, verbose=1, k=3, class_weights=None):
        self.loss = loss
        self.n_epoch = n_epoch
        self.learning_rate = learning_rate
        self.penalty = penalty
        self.alpha = alpha
        self.weights = None
        self.intercept = None
        self.verbose = verbose
        self.k = k
        self.class_weights = class_weights


    def train_data_transform(self, train_data):
        if not LabeledPoint_check(train_data):
            print('train data must be wrapped by LabeledPoint')
            exit(1)
        return train_data.map(lambda p: LabeledPoint(p.label, np.append(p.features, 1.)))

    def fit(self, train_data):
        sample_len = train_data.count()
        feature_len = len(train_data.first().features)
        augmented_data = self.train_data_transform(train_data).cache()

        if self.weights is None or self.intercept is None:
            weights = Vectors.dense(np.append(init_param(shape=(feature_len, 1)), 0.))
        else:
            weights = Vectors.dense(np.append(self.weights, self.intercept))

        grad_func = globals()[self.loss+'_grad']
        loss_func = globals()[self.loss+'_loss']
        penalty = self.penalty
        alpha = self.alpha
        k = self.k
        class_weights = self.class_weights
        for i in range(self.n_epoch):
            print('...Epoch: ', i)
            bcweights = sc.broadcast(weights)
            new_weights, loss = augmented_data.map(lambda p: gradient_descent(features=p.features,
                label=p.label, learning_rate=self.learning_rate, weights=bcweights.value, 
                loss_func=loss_func, grad_func=grad_func, 
                penalty=penalty, alpha=alpha, k=k, class_weights=class_weights))\
            .reduce(lambda x, y: (x[0]+y[0], x[1]+y[1]))

            weights = (new_weights / sample_len)

            if self.verbose:
                print('loss: %f' % (loss/sample_len))
        self.weights, self.intercept = weights.toArray()[:-1], weights.toArray()[-1]


    def predict(self, X):
        weights = self.weights
        intercept = self.intercept

        def _predict(x):
            if isinstance(x, LabeledPoint):
                return sign(weights.dot(x.features) + intercept)
            elif RDD_check(x):
                return None
            else:
                return sign(weights.dot(x) + intercept)
        if RDD_check(X):
            return X.map(_predict)
        else:
            return _predict(X)

    def score(self, X_y):
        if not LabeledPoint_check(X_y):
            print('test data must be wrapped by LabeledPoint')
            exit(1)
        weights = self.weights
        intercept = self.intercept
        tp = X_y.map(lambda p: 1. if p.label == sign(weights.dot(p.features) + intercept) else 0.)\
            .reduce(lambda x, y: x+y)
        return tp / X_y.count()

    def transform(self, X):
        weights = self.weights
        intercept = self.intercept

        def _transform(x):
            if isinstance(x, LabeledPoint):
                return LabeledPoint(x.label, [sign(weights.dot(x.features) + intercept)])
            elif RDD_check(x):
                return None
            else:
                return Vectors.dense([sign(weights.dot(x) + intercept)])

        if RDD_check(X):
            return X.map(_transform)
        else:
            return _transform(X)

Perceptron classifier

class Perceptron(BaseSGDClassifier):
    def __init__(self, n_epoch=5, learning_rate=1., penalty='l2', alpha=0.01, verbose=1, k=3, class_weights=None):
        super(Perceptron, self).__init__(loss='perceptron',
                                         n_epoch=n_epoch,
                                         learning_rate=learning_rate,
                                         penalty=penalty,
                                         alpha=alpha,
                                         verbose=verbose,
                                         k=k,
                                         class_weights=class_weights)

Linear SVM

class SVC(BaseSGDClassifier):
    def __init__(self, n_epoch=5, learning_rate=1., penalty='l2', alpha=0.01, verbose=1, k=3, class_weights=None):
        super(SVC, self).__init__(loss='hinge',
                                  n_epoch=n_epoch,
                                  learning_rate=learning_rate,
                                  penalty=penalty,
                                  alpha=alpha,
                                  verbose=verbose,
                                  k=k,
                                  class_weights=class_weights)

Logistic regression

class LogisticRegression(BaseSGDClassifier):
    def __init__(self, n_epoch=5, learning_rate=1., penalty='l2', alpha=0.01, verbose=1, k=3, class_weights=None):
        super(LogisticRegression, self).__init__(loss='logit',
                                                 n_epoch=n_epoch,
                                                 learning_rate=learning_rate,
                                                 penalty=penalty,
                                                 alpha=alpha,
                                                 verbose=verbose,
                                                 k=k,
                                                 class_weights=class_weights)

    def predict_prob(self, X):
        weights = self.weights
        intercept = self.intercept

        def _predict(x):
            if isinstance(x, LabeledPoint):
                return sigmoid(weights.dot(x.features) + intercept)
            elif RDD_check(x):
                return None
            else:
                return sigmoid(weights.dot(x) + intercept)

        if RDD_check(X):
            return X.map(_predict)
        else:
            return _predict(X)

    def transform(self, X, prob=True):
        weights = self.weights
        intercept = self.intercept

        def _transform(x):
            if isinstance(x, LabeledPoint):
                if prob:
                    return LabeledPoint(x.label, [sigmoid(weights.dot(x.features) + intercept)])
                else:
                    return LabeledPoint(x.label, [sign(weights.dot(x.features) + intercept)])
            elif RDD_check(x):
                return None
            else:
                if prob:
                    return Vectors.dense([sigmoid(weights.dot(x) + intercept)])
                else:
                    return Vectors.dense([sign(weights.dot(x) + intercept)])

        if RDD_check(X):
            return X.map(_transform)
        else:
            return _transform(X)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment