import pandas as pd | |
import numpy as np | |
import patsy | |
import tensorflow as tf | |
class SleepReg(tf.Module): | |
def __init__(self, sleepdata_path): | |
"""Initializing tensorflow variables and other necessary matrices""" | |
# These two TensorFlow variables show up in the trainable_variables | |
self.beta = tf.Variable(tf.zeros([2, 1], dtype=tf.float64)) | |
self.b = tf.Variable(tf.zeros([36, 1], dtype=tf.float64)) | |
self.set_optimizer() | |
self.data = self.get_sleepstudy_data(sleepdata_path) | |
self.N_subjects = 18 | |
self.y = self.data.Reaction.values.reshape((180, 1)) | |
self.X = self.get_X_matrix() | |
self.Z = self.get_Z_matrix() | |
# Initializing Sigma_b and sigmasq_epsilon to lme4's output for demo | |
self.Sigma_b = np.array([[24.7404 ** 2, .066 * 5.9221 * 24.7404], | |
[.066 * 24.7404 * 5.9221, 5.9221 ** 2]]) | |
self.sigmasq_epsilon = 25.5918 ** 2 | |
self.V = self.get_V_matrix() | |
self.H = self.get_H_matrix() | |
self.df = self.get_approximate_df() | |
def zero_coefficients(self): | |
self.beta = tf.Variable(tf.zeros([2, 1], dtype=tf.float64)) | |
self.b = tf.Variable(tf.zeros([36, 1], dtype=tf.float64)) | |
def reset_variances(self, Sigma_b, sigmasq_epsilon): | |
self.Sigma_b = Sigma_b | |
self.sigmasq_epsilon = sigmasq_epsilon | |
# Update matrices that depend on these variances | |
self.V = self.get_V_matrix() | |
self.H = self.get_H_matrix() | |
self.df = self.get_approximate_df() | |
def get_sleepstudy_data(self, sleepdata_path): | |
sleep_df = pd.read_csv(sleepdata_path) | |
sleep_df['SubjectId'] = pd.Categorical(sleep_df['Subject']).codes | |
sleep_df['SubjectId'] = sleep_df.SubjectId.apply(lambda x: str(x)) | |
return sleep_df | |
def get_X_matrix(self): | |
return patsy.dmatrix('1 + Days', data=self.data) | |
def get_Z_matrix(self): | |
"""Gets textbook (block diagonal) random effects Z matrix.""" | |
Z_int = patsy.dmatrix('0 + SubjectId', data=self.data) | |
Z_slope = patsy.dmatrix('0 + SubjectId:Days', data=self.data) | |
Z_version1 = np.concatenate((Z_int, Z_slope), axis=1) | |
# Prepare a column permutation matrix | |
P = np.zeros((2 * self.N_subjects, 2 * self.N_subjects)) | |
j = 0 | |
for i in range(self.N_subjects): | |
P[i, 2 * j] = 1.0 | |
P[self.N_subjects + i, 2 * j + 1] = 1.0 | |
j += 1 | |
Z = np.matmul(Z_version1, P) # Textbook Z matrix | |
return Z | |
def get_V_matrix(self): | |
"""Get inverse random effects vector variance matrix""" | |
Sigma_b_inv = np.linalg.inv(self.Sigma_b) | |
V = np.kron(np.identity(self.N_subjects), Sigma_b_inv) | |
return V | |
def get_H_matrix(self): | |
"""Define the Hat matrix H such that y_hat = Hy.""" | |
Xt = np.transpose(self.X) | |
Zt = np.transpose(self.Z) | |
XtX_s = np.matmul(Xt, self.X) / self.sigmasq_epsilon | |
XtZ_s = np.matmul(Xt, self.Z) / self.sigmasq_epsilon | |
ZtX_s = np.matmul(Zt, self.X) / self.sigmasq_epsilon | |
ZtZ_s_plus_V = np.matmul(Zt, self.Z) / self.sigmasq_epsilon + self.V | |
X_and_Z = np.hstack((self.X, self.Z)) | |
Xt_over_Zt_s = np.vstack((Xt, Zt)) / self.sigmasq_epsilon | |
D = np.vstack((np.hstack((XtX_s, XtZ_s)), | |
np.hstack((ZtX_s, ZtZ_s_plus_V)))) | |
D_inv = np.linalg.inv(D) | |
H = np.matmul(np.matmul(X_and_Z, D_inv), Xt_over_Zt_s) | |
return H | |
def get_approximate_df(self): | |
return np.trace(self.H) | |
def get_fitted_values(self): | |
return np.matmul(self.H, self.y) | |
def get_random_effects(self): | |
b_hat = np.squeeze(self.b.numpy()) | |
rand_effs = np.reshape(b_hat, (self.N_subjects, 2)) | |
return pd.DataFrame(rand_effs, columns = ['mu', 'b']) | |
def get_rnd_effs_variance(self): | |
"""Returns variance matrix of computed random effects""" | |
rand_effs = self.get_random_effects() | |
return np.cov(rand_effs, rowvar=False) | |
def estimate_sigmasq_epsilon(self): | |
y_hat = self.get_fitted_values() | |
df = self.get_approximate_df() | |
resid = self.y - y_hat | |
est = np.matmul(resid.T, resid) / (resid.shape[0] - df) | |
return est.ravel() | |
@tf.function | |
def _get_expectation(self, X, Z, beta, b): | |
return tf.matmul(X, beta) + tf.matmul(Z, b) | |
@tf.function | |
def _get_sse(self, y, y_hat): | |
return tf.reduce_sum(tf.square(y - y_hat)) | |
@tf.function | |
def _get_neg_log_prior(self, b, V): | |
"""Get the wieght pentalty from the full Guassian distribution""" | |
bTV = tf.matmul(tf.transpose(b), V) | |
bTVb = tf.matmul(bTV, b) | |
return tf.squeeze(bTVb) | |
def set_optimizer(self, adam=False): | |
"""Choose optimizer for the model training task.""" | |
self.optimizer = (tf.optimizers.Adam() if adam else | |
tf.optimizers.SGD(learning_rate=.025, momentum=.98)) | |
def train(self, epochs=1500, display_beta=True): | |
"""Trains model using a TensorFlow training loop""" | |
X = tf.constant(self.X) | |
Z = tf.constant(self.Z) | |
y = tf.constant(self.y) | |
V = tf.constant(self.V) | |
sigmasq_int = tf.constant(self.Sigma_b[0, 0]) | |
sigmasq_slope = tf.constant(self.Sigma_b[0, 0]) | |
for epoch in range(epochs): | |
with tf.GradientTape() as gradient_tape: | |
y_pred = self._get_expectation(X, Z, self.beta, self.b) | |
loss = (self._get_sse(y, y_pred) / self.sigmasq_epsilon | |
+ self._get_neg_log_prior(self.b, V)) | |
gradient = gradient_tape.gradient(loss, ( | |
(self.trainable_variables[0], self.trainable_variables[1]) | |
)) | |
self.optimizer.apply_gradients(zip(gradient, | |
self.trainable_variables)) | |
if display_beta: | |
print(self.beta) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment