Skip to content

Instantly share code, notes, and snippets.

@fonnesbeck
Created April 3, 2014 11:23
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save fonnesbeck/9952640 to your computer and use it in GitHub Desktop.
Kernel regression using Theano
"""
Implementation of kernel regression:
"""
from pylearn.old_dataset.learner import OfflineLearningAlgorithm
from theano import tensor as T
from theano.tensor.nnet import prepend_1_to_each_row
from theano.scalar import as_scalar
from common.autoname import AutoName
import theano
import numpy
# map a N-vector to a 1xN matrix
row_vector = theano.tensor.DimShuffle((False,),['x',0])
# map a N-vector to a Nx1 matrix
col_vector = theano.tensor.DimShuffle((False,),[0,'x'])
class KernelRegression(OfflineLearningAlgorithm):
"""
Implementation of kernel regression:
* the data are n (x_t,y_t) pairs and we want to estimate E[y|x]
* the predictor computes
f(x) = b + \sum_{t=1}^n \alpha_t K(x,x_t)
with free parameters b and alpha, training inputs x_t,
and kernel function K (gaussian by default).
Clearly, each prediction involves O(n) computations.
* the learner chooses b and alpha to minimize
lambda alpha' G' G alpha + \sum_{t=1}^n (f(x_t)-y_t)^2
where G is the matrix with entries G_ij = K(x_i,x_j).
The first (L2 regularization) term is the squared L2
norm of the primal weights w = \sum_t \alpha_t phi(x_t)
where phi is the function s.t. K(u,v)=phi(u).phi(v).
* this involves solving a linear system with (n+1,n+1)
matrix, which is an O(n^3) computation. In addition,
that linear system matrix requires O(n^2) memory.
So this learning algorithm should be used only for
small datasets.
* the linear system is
(M + lambda I_n) theta = (1, y)'
where theta = (b, alpha), I_n is the (n+1)x(n+1) matrix that is the identity
except with a 0 at (0,0), M is the matrix with G in the sub-matrix starting
at (1,1), 1's in column 0, except for a value of n at (0,0), and sum_i G_{i,j}
in the rest of row 0.
Note that this is gives an estimate of E[y|x,training_set] that is the
same as obtained with a Gaussian process regression. The GP
regression would also provide a Bayesian Var[y|x,training_set].
It corresponds to an assumption that f is a random variable
with Gaussian (process) prior distribution with covariance
function K. Because we assume Gaussian noise we obtain a Gaussian
posterior for f (whose mean is computed here).
Usage:
kernel_regressor=KernelRegression(L2_regularizer=0.1,gamma=0.5) (kernel=GaussianKernel(gamma=0.5))
kernel_predictor=kernel_regressor(training_set)
all_results_dataset=kernel_predictor(test_set) # creates a dataset with "output" and "squared_error" field
outputs = kernel_predictor.compute_outputs(inputs) # inputs and outputs are numpy arrays
outputs, errors = kernel_predictor.compute_outputs_and_errors(inputs,targets)
errors = kernel_predictor.compute_errors(inputs,targets)
mse = kernel_predictor.compute_mse(inputs,targets)
The training_set must have fields "input" and "target".
The test_set must have field "input", and needs "target" if
we want to compute the squared errors.
The predictor parameters are obtained analytically from the training set.
Training is only done on a whole training set rather than on minibatches
(no online implementation).
The dataset fields expected and produced by the learning algorithm and the trained model
are the following:
- Input and output dataset fields (example-wise quantities):
- 'input' (always expected as an input_dataset field)
- 'target' (always expected by the learning algorithm, optional for learned model)
- 'output' (always produced by learned model)
- 'squared_error' (optionally produced by learned model if 'target' is provided)
= example-wise squared error
"""
def __init__(self, kernel=None, L2_regularizer=0, gamma=1, use_bias=False):
# THE VERSION WITH BIAS DOES NOT SEEM RIGHT
self.kernel = kernel
self.L2_regularizer=L2_regularizer
self.use_bias=use_bias
self.gamma = gamma # until we fix things, the kernel type is fixed, Gaussian
self.equations = KernelRegressionEquations()
def __call__(self,trainset):
n_examples = len(trainset)
first_example = trainset[0]
n_inputs = first_example['input'].size
n_outputs = first_example['target'].size
b1=1 if self.use_bias else 0
M = numpy.zeros((n_examples+b1,n_examples+b1))
Y = numpy.zeros((n_examples+b1,n_outputs))
for i in xrange(n_examples):
M[i+b1,i+b1]=self.L2_regularizer
data = trainset.fields()
train_inputs = numpy.array(data['input'])
if self.use_bias:
Y[0]=1
Y[b1:,:] = numpy.array(data['target'])
train_inputs_square,sumG,G=self.equations.compute_system_matrix(train_inputs,self.gamma)
M[b1:,b1:] += G
if self.use_bias:
M[0,1:] = sumG
M[1:,0] = 1
M[0,0] = M.shape[0]
self.M=M
self.Y=Y
theta=numpy.linalg.solve(M,Y)
return KernelPredictor(theta,self.gamma, train_inputs, train_inputs_square)
class KernelPredictorEquations(AutoName):
train_inputs = T.matrix() # n_examples x n_inputs
train_inputs_square = T.vector() # n_examples
inputs = T.matrix() # minibatchsize x n_inputs
targets = T.matrix() # minibatchsize x n_outputs
theta = T.matrix() # (n_examples+1) x n_outputs
b1 = T.shape(train_inputs_square)[0]<T.shape(theta)[0]
gamma = T.scalar()
inv_gamma2 = 1./(gamma*gamma)
b = b1*theta[0]
alpha = theta[b1:,:]
inputs_square = T.sum(inputs*inputs,axis=1)
Kx = T.exp(-(row_vector(train_inputs_square)-2*T.dot(inputs,train_inputs.T)+col_vector(inputs_square))*inv_gamma2)
outputs = T.dot(Kx,alpha) + b # minibatchsize x n_outputs
squared_errors = T.sum(T.sqr(targets-outputs),axis=1)
__compiled = False
@classmethod
def compile(cls,linker='c|py'):
if cls.__compiled:
return
def fn(input_vars,output_vars):
return staticmethod(theano.function(input_vars,output_vars, linker=linker))
cls.compute_outputs = fn([cls.inputs,cls.theta,cls.gamma,cls.train_inputs,cls.train_inputs_square],[cls.outputs])
cls.compute_errors = fn([cls.outputs,cls.targets],[cls.squared_errors])
cls.__compiled = True
def __init__(self):
self.compile()
class KernelRegressionEquations(KernelPredictorEquations):
#M = T.matrix() # (n_examples+1) x (n_examples+1)
inputs = T.matrix() # n_examples x n_inputs
gamma = T.scalar()
inv_gamma2 = 1./(gamma*gamma)
inputs_square = T.sum(inputs*inputs,axis=1)
#new_G = G+T.dot(inputs,inputs.T)
#new_G = T.gemm(G,1.,inputs,inputs.T,1.)
G = T.exp(-(row_vector(inputs_square)-2*T.dot(inputs,inputs.T)+col_vector(inputs_square))*inv_gamma2)
sumG = T.sum(G,axis=0)
__compiled = False
@classmethod
def compile(cls,linker='c|py'):
if cls.__compiled:
return
def fn(input_vars,output_vars):
return staticmethod(theano.function(input_vars,output_vars, linker=linker))
cls.compute_system_matrix = fn([cls.inputs,cls.gamma],[cls.inputs_square,cls.sumG,cls.G])
cls.__compiled = True
def __init__(self):
self.compile()
class KernelPredictor(object):
"""
A kernel predictor has parameters theta (a bias vector and a weight matrix alpha)
it can use to make a non-linear prediction (according to the KernelPredictorEquations).
It can compute its output (bias + alpha * kernel(train_inputs,input) and a squared error (||output - target||^2).
"""
def __init__(self, theta, gamma, train_inputs, train_inputs_square=None):
self.theta=theta
self.gamma=gamma
self.train_inputs=train_inputs
if train_inputs_square==None:
train_inputs_square = numpy.sum(train_inputs*train_inputs,axis=1)
self.train_inputs_square=train_inputs_square
self.equations = KernelPredictorEquations()
def compute_outputs(self,inputs):
return self.equations.compute_outputs(inputs,self.theta,self.gamma,self.train_inputs,self.train_inputs_square)
def compute_errors(self,inputs,targets):
return self.equations.compute_errors(self.compute_outputs(inputs),targets)
def compute_outputs_and_errors(self,inputs,targets):
outputs = self.compute_outputs(inputs)
return [outputs,self.equations.compute_errors(outputs,targets)]
def compute_mse(self,inputs,targets):
errors = self.compute_errors(inputs,targets)
return numpy.sum(errors)/errors.size
def __call__(self,dataset,output_fieldnames=None,cached_output_dataset=False):
assert dataset.hasFields(["input"])
if output_fieldnames is None:
if dataset.hasFields(["target"]):
output_fieldnames = ["output","squared_error"]
else:
output_fieldnames = ["output"]
output_fieldnames.sort()
if output_fieldnames == ["squared_error"]:
f = self.compute_errors
elif output_fieldnames == ["output"]:
f = self.compute_outputs
elif output_fieldnames == ["output","squared_error"]:
f = self.compute_outputs_and_errors
else:
raise ValueError("unknown field(s) in output_fieldnames: "+str(output_fieldnames))
ds=ApplyFunctionDataSet(dataset,f,output_fieldnames)
if cached_output_dataset:
return CachedDataSet(ds)
else:
return ds
def kernel_predictor(inputs,params,*otherargs):
p = KernelPredictor(params,*otherargs[0])
return p.compute_outputs(inputs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment