Last active
August 13, 2019 15:50
-
-
Save fabian-paul/4eb48f495b48a64f21d87bb642ce5313 to your computer and use it in GitHub Desktop.
Find the main kinetic dividing plane using the Variational Approach to Markov Processes (VAMP)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import scipy | |
import scipy.optimize | |
import warnings | |
__all__ = ['VAMP42'] | |
__author__ = 'Fabian Paul <fapa@uchicago.edu>' | |
def sigma(x): | |
return scipy.special.expit(x) | |
class VAMP42(object): | |
def __init__(self, X, Y, init='linear'): | |
r'''Find the main kinetic dividing plane with the variational approach to Markov processes (VAMP). | |
Parameters | |
---------- | |
X : ndarray((T, 2)) | |
Initial points form all transition pairs. | |
Y : ndarray((T, 2)) | |
Final points from all transition pairs. | |
init : str | |
one of 'random' or 'linear' | |
Initialization algorithm: 'random' initalizes randomly (respecting | |
scale and location of the data). 'linear' solves the linear VAMP | |
problem first to find guesses of initial parameters for the nonlinear | |
problem. | |
''' | |
# augment X and Y with constant | |
T = len(X) | |
ones = np.ones(shape=(T, 1)) | |
self.X = np.hstack((ones, X)) | |
self.Y = np.hstack((ones, Y)) | |
if isinstance(init, str) and init=='random': | |
# pick proper scale and shift that matches the data | |
scales = 1./(np.std(np.concatenate((X, Y)), axis=0) + 1.E-8) | |
self.initial = np.zeros(X.shape[1] + 1) | |
self.initial[1:] = (np.random.rand(X.shape[1]) - 0.5) * scales | |
self.initial[0] = -np.vdot(self.initial[1:], np.mean(np.concatenate((X, Y)), axis=0)) # make initial plane go through the data mean | |
elif isinstance(init, str) and init=='linear': | |
# find an intial point, by solving the linear problem | |
C00 = np.dot(self.X.T, self.X) / T | |
C11 = np.dot(self.Y.T, self.Y) / T | |
C01 = np.dot(self.X.T, self.Y) / T | |
C00_inv = np.linalg.inv(C00) | |
C11_inv = np.linalg.inv(C11) | |
values, vectors = np.linalg.eig(np.linalg.multi_dot((C00_inv, C01, C11_inv, C01.T))) | |
order = np.argsort(values.real) | |
principal_vector = vectors[:, order[-2]].real | |
self.initial = principal_vector | |
else: | |
self.initial = init | |
def selftest(self, delta=1.E-8): | |
x0 = self.initial | |
direction = (np.random.rand(len(self.initial)) - 0.5) * delta | |
f1, grad1 = self.score_function_and_gradient(x0) | |
f2 = self.score_function(x0 + direction) | |
df = np.dot(grad1, direction) | |
rel_err = np.abs(f2 - f1 - df) / np.abs(f2 - f1) | |
# rel_err is second order error term / first order error term = first order error term | |
sign_correct = (np.sign(df) == np.sign(f2 - f1)) | |
if not sign_correct: | |
warnings.warn('Analytical gradient has wrong sign.') | |
if rel_err > 0.001: | |
warnings.warn('Error %f of analytical gradient seems too large. Expected O(eps).' % rel_err) | |
return rel_err, sign_correct | |
def run(self, approx=False): | |
'Perform optimization of the VAMP score, finding the optimal parameters of the sigmoid.' | |
if approx: | |
fprime = None | |
else: | |
self.selftest() | |
fprime = self.score_gradient | |
xopt, fopt, self._gopt, Bopt, self._func_calls, self._grad_calls, self._warnflag, hist = \ | |
scipy.optimize.fmin_bfgs(f=self.score_function, x0=self.initial, fprime=fprime, disp=0, full_output=True, retall=True) | |
if self._warnflag != 0: | |
if self._warnflag == 1: | |
warnings.warn('BFGS returned with error: Maximum number of iterations exceeded.') | |
elif self._warnflag == 2: | |
warnings.warn('BFGS returned with error: Gradient and/or function calls not changing.') | |
else: | |
warnings.warn('BFGS returned with error code ' + str(self._warnflag)) | |
self._b = xopt | |
self._score = -fopt | |
self.hist = -np.array([self.score_function(x) for x in hist]) | |
# find normalization of singular functions | |
self._std_left = np.std(sigma(np.dot(self.X, self._b))) | |
self._std_right = np.std(sigma(np.dot(self.Y, self._b))) | |
self._var_left = np.var(sigma(np.dot(self.X, self._b))) | |
self._var_right = np.var(sigma(np.dot(self.Y, self._b))) | |
return self | |
@property | |
def b(self): | |
return self._b | |
@property | |
def ext_hnf(self): | |
'Hesse normal form (normal, -distance) and steepness parameter of the dividing plane.' | |
norm = np.linalg.norm(self._b[1:]) | |
if self._b[0] / norm > 0: | |
norm = -norm | |
return (self._b[1:] / norm, self._b[0] / norm, norm) | |
@property | |
def ext_hnf_initial(self): | |
'Hesse normal form (normal, -distance) and steepness parameter of the dividing plane.' | |
norm = np.linalg.norm(self.initial[1:]) | |
if self.initial[0] / norm > 0: | |
norm = -norm | |
return (self.initial[1:] / norm, self.initial[0] / norm, norm) | |
@property | |
def hnf(self): | |
'Hesse normal form (normal, -distance) of the dividing plane.' | |
ext_hnf = self.ext_hnf | |
return (ext_hnf[0], ext_hnf[1]) | |
def score_function_and_gradient(self, b, which='both'): | |
T = len(self.X) | |
sxp = sigma(np.dot(self.X, b)) # sxp: read sigma of x, plus | |
sxm = 1. - sxp | |
syp = sigma(np.dot(self.Y, b)) | |
sym = 1. - syp | |
sxpm = sxp*sxm | |
sypm = syp*sym | |
C00 = np.zeros((2, 2)) | |
C11 = np.zeros((2, 2)) | |
C01 = np.zeros((2, 2)) | |
C00[0, 0] = np.vdot(sxp, sxp) / T | |
C00[1, 1] = np.vdot(sxm, sxm) / T | |
C00[0, 1] = np.vdot(sxp, sxm) / T | |
C00[1, 0] = C00[0, 1] | |
C11[0, 0] = np.vdot(syp, syp) / T | |
C11[1, 1] = np.vdot(sym, sym) / T | |
C11[0, 1] = np.vdot(syp, sym) / T | |
C11[1, 0] = C11[0, 1] | |
C01[0, 0] = np.vdot(sxp, syp) / T | |
C01[1, 1] = np.vdot(sxm, sym) / T | |
C01[0, 1] = np.vdot(sxp, sym) / T | |
C01[1, 0] = np.vdot(sxm, syp) / T | |
C00_inv = np.linalg.inv(C00) | |
C11_inv = np.linalg.inv(C11) | |
Kf = np.dot(C00_inv, C01) | |
Kr = np.dot(C11_inv, C01.T) | |
R = np.einsum('ik,ki', Kf, Kr) | |
if which=='function': | |
return -R | |
# gradient computation starts here | |
d = len(b) | |
XtYp = np.zeros((2, d, 2)) # XtYP, read: X transpose times Y prime | |
YtXp = np.zeros((2, d, 2)) | |
XtYp[0, :, 0] = np.dot(self.Y.T, sypm*sxp) / T | |
XtYp[0, :, 1] = -np.dot(self.Y.T, sypm*sxp) / T | |
XtYp[1, :, 0] = np.dot(self.Y.T, sypm*sxm) / T | |
XtYp[1, :, 1] = -np.dot(self.Y.T, sypm*sxm) / T | |
YtXp[0, :, 0] = np.dot(self.X.T, sxpm*syp) / T | |
YtXp[0, :, 1] = -np.dot(self.X.T, sxpm*syp) / T | |
YtXp[1, :, 0] = np.dot(self.X.T, sxpm*sym) / T | |
YtXp[1, :, 1] = -np.dot(self.X.T, sxpm*sym) / T | |
XtXp = np.zeros((2, d, 2)) | |
YtYp = np.zeros((2, d, 2)) | |
XtXp[0, :, 0] = np.dot(self.X.T, sxpm*sxp) / T | |
XtXp[1, :, 1] = -np.dot(self.X.T, sxpm*sxm) / T | |
XtXp[1, :, 0] = np.dot(self.X.T, sxpm*sxm) / T | |
XtXp[0, :, 1] = -np.dot(self.X.T, sxpm*sxp) / T | |
YtYp[0, :, 0] = np.dot(self.Y.T, sypm*syp) / T | |
YtYp[1, :, 1] = -np.dot(self.Y.T, sypm*sym) / T | |
YtYp[1, :, 0] = np.dot(self.Y.T, sypm*sym) / T | |
YtYp[0, :, 1] = -np.dot(self.Y.T, sypm*syp) / T | |
gradient = 2*np.einsum('ij,jk,kni->n', Kf, C11_inv, YtXp) | |
gradient -= 2*np.einsum('ij,jk,kl,lni->n', Kf, C11_inv, Kf.T, XtXp) | |
gradient += 2*np.einsum('ij,jk,kni->n', Kr, C00_inv, XtYp) | |
gradient -= 2*np.einsum('ij,jk,kl,lni->n', Kr, C00_inv, Kr.T, YtYp) | |
assert gradient.shape == (d,) | |
if which=='both': | |
return -R, -gradient | |
elif which=='gradient': | |
return -gradient | |
else: | |
raise ValueError('which should one of "function", "gradient", or "both"') | |
def score_function(self, b): | |
return self.score_function_and_gradient(b, 'function') | |
def score_gradient(self, b): | |
return self.score_function_and_gradient(b, 'gradient') | |
def f(self, x): | |
'Eigenfunction evaluated at point x.' | |
return sigma(np.dot(x, self._b[1:]) + self._b[0]) | |
def assign(self, x): | |
'Crisp assingment of data points to one of the two "states"' | |
return (np.dot(x, self._b[1:]) + self._b[0] > 0).astype(int) | |
@property | |
def eigenvalue(self): | |
'The dominant VAMP singular value.' | |
return self._score - 1. | |
def kinetic_distance(self, a, b, normed=False): | |
'Kinetic distance between points a and b.' | |
# mean-free property of the singular function is irrelevant here, since we compute the difference | |
delta = self.eigenvalue * (sigma(np.dot(a, self._b[1:]) + self._b[0]) - sigma(np.dot(b, self._b[1:]) + self._b[0]))**2 | |
if normed: | |
return delta / self._var_left | |
else: | |
return delta | |
if __name__== '__main__': | |
dim = 99 | |
X = np.vstack((np.random.randn(100, dim) + 8*np.ones(dim), | |
np.random.randn(100, dim))) | |
Y = X + np.random.randn(200, dim) | |
print('self test:', VAMP42(X, Y).selftest()) | |
#normal, intercept, steepness = VAMP42(X, Y).run(approx=True).ext_hnf | |
#print('normal, intercept, steepness:', normal, intercept, steepness) | |
vamp = VAMP42(X, Y).run(approx=False) | |
normal, intercept, steepness = vamp.ext_hnf | |
print('score:', vamp._score) | |
#print('normal, intercept, steepness:', normal, intercept, steepness) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment