Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import numpy as np
import tensorflow as tf
nobs = 100
alpha = 1.0
beta = np.array([0.0, 0.5, 0.25])
L = np.array([[1.0, 0.0, 0.0], [0.25, 1.1, 0.0], [0.2, 0.2, 1.25]])
X = np.random.randn(nobs, 3) @ L
y = alpha + X@beta
class GroupedInterceptLinearCoeffs(tf.keras.layers.Layer):
"""
"""
def __init__(self, ngroup=1, **kwargs):
super(GroupedInterceptLinearCoeffs, self).__init__()
self.ngroup = ngroup
def build(self, input_shape):
self.a = self.add_weight(
shape=(), dtype="float32",
initializer="random_normal", trainable=True
)
self.b = self.add_weight(
shape=(input_shape[-1],), dtype="float32",
initializer="random_normal", trainable=True
)
@tf.function()
def call(self, inputs):
out = self.a + tf.linalg.matvec(inputs, self.b)
return out
class OLS(tf.keras.Model):
"""
"""
def __init__(self, ngroups=1, name="ols", **kwargs):
super(OLS, self).__init__(name=name, **kwargs)
self.lm = GroupedInterceptLinearCoeffs(ngroups)
def call(self, inputs):
out = self.lm(inputs)
return out
olsmodel = OLS(ngroups=1)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-2)
olsmodel.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
olsmodel.fit(
X.astype(np.float32), y.astype(np.float32), epochs=25, batch_size=25, shuffle=True, verbose=False
)
# Show converged
olsmodel.fit(X.astype(np.float32), y.astype(np.float32), epochs=1, batch_size=25, shuffle=True)
class GroupedInterceptLinearCoeffs_gather(tf.keras.layers.Layer):
"""
"""
def __init__(self, ngroup=1, **kwargs):
super(GroupedInterceptLinearCoeffs_gather, self).__init__()
self.ngroup = ngroup
def build(self, input_shape):
self.a = self.add_weight(
shape=(self.ngroup,), dtype="float32",
initializer="random_normal", trainable=True
)
self.b = self.add_weight(
shape=(input_shape[1][-1],), dtype="float32",
initializer="random_normal", trainable=True
)
@tf.function()
def call(self, inputs):
out = tf.gather(self.a, inputs[0], axis=0, batch_dims=0) + tf.linalg.matvec(inputs[1], self.b)
return out
class OLS_gather(tf.keras.Model):
"""
"""
def __init__(self, ngroups=1, name="ols", **kwargs):
super(OLS_gather, self).__init__(name=name, **kwargs)
self.lm = GroupedInterceptLinearCoeffs_gather(ngroups)
def call(self, inputs):
out = self.lm(inputs)
return out
olsmodel_g = OLS_gather(ngroups=1)
optimizer_g = tf.keras.optimizers.Adam(learning_rate=1e-2)
olsmodel_g.compile(optimizer_g, loss=tf.keras.losses.MeanSquaredError())
olsmodel_g.fit([np.zeros(
(X.shape[0],), dtype=np.int32), X.astype(np.float32)], y.astype(np.float32),
epochs=25, batch_size=25, verbose=False
)
olsmodel_g.fit(
[np.zeros((X.shape[0],), dtype=np.int32), X.astype(np.float32)], y.astype(np.float32),
epochs=1, batch_size=25
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment