from __future__ import print_function
import numpy as np
from spark_config import sc
from pyspark .mllib .linalg import Vectors
from pyspark .mllib .regression import LabeledPoint
from util import RDD_check , LabeledPoint_check
from sklearn .base import BaseEstimator
define loss function and gradient function
def perceptron_loss (y , y_pred ):
return max (0. , - y * y_pred )
def perceptron_grad (y , y_pred ):
return - y if - y * y_pred > 0. else 0.
def hinge_loss (y , y_pred ):
return max (0. , 1. - y * y_pred )
def hinge_grad (y , y_pred ):
return - y if - y * y_pred > - 1. else 0.
def logit_loss (y , y_pred ):
return np .log (1. + np .exp (- y * y_pred ))
def logit_grad (y , y_pred ):
return - y * (1. - sigmoid (y * y_pred ))
penalty gradient function, weights initial function
def sigmoid (x ):
return 1. / (1. + np .exp (- x ))
def sign (x ):
return 1. if x >= 0. else - 1.
def penalty_grad (penalty , weights ):
if penalty == 'l2' :
return weights
elif penalty == 'l1' :
return np .sign (weights )
else :
return Vectors .zeros (size = weights .size )
def init_param (shape ):
s = np .sqrt (2. / shape [0 ])
return np .random .normal (loc = 0.0 , scale = s , size = shape )
def gradient_descent (features , label , weights , loss_func , grad_func , k , learning_rate , penalty , alpha , class_weights ):
if class_weights is not None :
class_weight = class_weights [label ]
else :
class_weight = 1.
for i in range (k ):
dweights = grad_func (label , features .dot (weights ))* features
dweights = dweights + alpha * np .append (penalty_grad (penalty , weights [:- 1 ]), 0 )
weights = weights - learning_rate * class_weight * dweights
loss = class_weight * loss_func (label , features .dot (weights ))
return (weights , loss )
class BaseSGDClassifier (BaseEstimator ):
def __init__ (self , loss = "hinge" , n_epoch = 5 , learning_rate = 1. ,
penalty = 'l2' , alpha = 0.01 , verbose = 1 , k = 3 , class_weights = None ):
self .loss = loss
self .n_epoch = n_epoch
self .learning_rate = learning_rate
self .penalty = penalty
self .alpha = alpha
self .weights = None
self .intercept = None
self .verbose = verbose
self .k = k
self .class_weights = class_weights
def train_data_transform (self , train_data ):
if not LabeledPoint_check (train_data ):
print ('train data must be wrapped by LabeledPoint' )
exit (1 )
return train_data .map (lambda p : LabeledPoint (p .label , np .append (p .features , 1. )))
def fit (self , train_data ):
sample_len = train_data .count ()
feature_len = len (train_data .first ().features )
augmented_data = self .train_data_transform (train_data ).cache ()
if self .weights is None or self .intercept is None :
weights = Vectors .dense (np .append (init_param (shape = (feature_len , 1 )), 0. ))
else :
weights = Vectors .dense (np .append (self .weights , self .intercept ))
grad_func = globals ()[self .loss + '_grad' ]
loss_func = globals ()[self .loss + '_loss' ]
penalty = self .penalty
alpha = self .alpha
k = self .k
class_weights = self .class_weights
for i in range (self .n_epoch ):
print ('...Epoch: ' , i )
bcweights = sc .broadcast (weights )
new_weights , loss = augmented_data .map (lambda p : gradient_descent (features = p .features ,
label = p .label , learning_rate = self .learning_rate , weights = bcweights .value ,
loss_func = loss_func , grad_func = grad_func ,
penalty = penalty , alpha = alpha , k = k , class_weights = class_weights ))\
.reduce (lambda x , y : (x [0 ]+ y [0 ], x [1 ]+ y [1 ]))
weights = (new_weights / sample_len )
if self .verbose :
print ('loss: %f' % (loss / sample_len ))
self .weights , self .intercept = weights .toArray ()[:- 1 ], weights .toArray ()[- 1 ]
def predict (self , X ):
weights = self .weights
intercept = self .intercept
def _predict (x ):
if isinstance (x , LabeledPoint ):
return sign (weights .dot (x .features ) + intercept )
elif RDD_check (x ):
return None
else :
return sign (weights .dot (x ) + intercept )
if RDD_check (X ):
return X .map (_predict )
else :
return _predict (X )
def score (self , X_y ):
if not LabeledPoint_check (X_y ):
print ('test data must be wrapped by LabeledPoint' )
exit (1 )
weights = self .weights
intercept = self .intercept
tp = X_y .map (lambda p : 1. if p .label == sign (weights .dot (p .features ) + intercept ) else 0. )\
.reduce (lambda x , y : x + y )
return tp / X_y .count ()
def transform (self , X ):
weights = self .weights
intercept = self .intercept
def _transform (x ):
if isinstance (x , LabeledPoint ):
return LabeledPoint (x .label , [sign (weights .dot (x .features ) + intercept )])
elif RDD_check (x ):
return None
else :
return Vectors .dense ([sign (weights .dot (x ) + intercept )])
if RDD_check (X ):
return X .map (_transform )
else :
return _transform (X )
class Perceptron (BaseSGDClassifier ):
def __init__ (self , n_epoch = 5 , learning_rate = 1. , penalty = 'l2' , alpha = 0.01 , verbose = 1 , k = 3 , class_weights = None ):
super (Perceptron , self ).__init__ (loss = 'perceptron' ,
n_epoch = n_epoch ,
learning_rate = learning_rate ,
penalty = penalty ,
alpha = alpha ,
verbose = verbose ,
k = k ,
class_weights = class_weights )
class SVC (BaseSGDClassifier ):
def __init__ (self , n_epoch = 5 , learning_rate = 1. , penalty = 'l2' , alpha = 0.01 , verbose = 1 , k = 3 , class_weights = None ):
super (SVC , self ).__init__ (loss = 'hinge' ,
n_epoch = n_epoch ,
learning_rate = learning_rate ,
penalty = penalty ,
alpha = alpha ,
verbose = verbose ,
k = k ,
class_weights = class_weights )
class LogisticRegression (BaseSGDClassifier ):
def __init__ (self , n_epoch = 5 , learning_rate = 1. , penalty = 'l2' , alpha = 0.01 , verbose = 1 , k = 3 , class_weights = None ):
super (LogisticRegression , self ).__init__ (loss = 'logit' ,
n_epoch = n_epoch ,
learning_rate = learning_rate ,
penalty = penalty ,
alpha = alpha ,
verbose = verbose ,
k = k ,
class_weights = class_weights )
def predict_prob (self , X ):
weights = self .weights
intercept = self .intercept
def _predict (x ):
if isinstance (x , LabeledPoint ):
return sigmoid (weights .dot (x .features ) + intercept )
elif RDD_check (x ):
return None
else :
return sigmoid (weights .dot (x ) + intercept )
if RDD_check (X ):
return X .map (_predict )
else :
return _predict (X )
def transform (self , X , prob = True ):
weights = self .weights
intercept = self .intercept
def _transform (x ):
if isinstance (x , LabeledPoint ):
if prob :
return LabeledPoint (x .label , [sigmoid (weights .dot (x .features ) + intercept )])
else :
return LabeledPoint (x .label , [sign (weights .dot (x .features ) + intercept )])
elif RDD_check (x ):
return None
else :
if prob :
return Vectors .dense ([sigmoid (weights .dot (x ) + intercept )])
else :
return Vectors .dense ([sign (weights .dot (x ) + intercept )])
if RDD_check (X ):
return X .map (_transform )
else :
return _transform (X )