Last active
September 30, 2021 13:27
-
-
Save prerakmody/f80b25ebc57bc8a3c0850016e3ebb13d to your computer and use it in GitHub Desktop.
Bayesian Models (Tensorflow 2.4.0 + Tensorflow Prob 0.12.1)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
OG Ref: https://github.com/tensorflow/probability/issues/620 | |
Goals | |
- To experiment with DNNs built using Flipout layers in both eager and non-eager mode | |
- Eager mode allows for debugging using tf.print() | |
- Non-eager mode is supposed to be faster and less memory consuming since its pre-computes functions in a graph | |
- Models will be made using functional API and the dataset will not use tf.data.Dataset | |
Notes | |
- initially, the code below only worked in non-eager mode | |
- adding weight initialization before model.fit() solves that issue and allows to debug in eager mode | |
- On a Tesla V100 (using nvidia-smi and progress bar) | |
- Eager Mode: 1037 MB, 11s | |
- Non-Eager Mode: 909 MB, 9s | |
Tested with TFlow 2.4.0 and TFlow prob 0.12.1 | |
""" | |
import os | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
import tensorflow as tf | |
import tensorflow_probability as tfp | |
if len(tf.config.list_physical_devices('GPU')):tf.config.experimental.set_memory_growth(tf.config.list_physical_devices('GPU')[0], True) | |
print (' - tflow ver : ', tf.__version__) # 2.4.0 | |
print (' - tflow-prob ver: ', tfp.__version__) # 0.12.1 | |
tf.compat.v1.disable_eager_execution() | |
# Step 1 - Model | |
def get_bayesian_model_functionalapi(input_shape=None, num_classes=10): | |
# Using the functional API: https://www.tensorflow.org/api_docs/python/tf/keras/Model | |
input = tf.keras.layers.Input(shape=input_shape) | |
conv1 = tfp.layers.Convolution2DFlipout(6, kernel_size=5, padding="SAME", activation=tf.nn.relu)(input) | |
flatten1 = tf.keras.layers.Flatten()(conv1) | |
fc1 = tfp.layers.DenseFlipout(84, activation=tf.nn.relu)(flatten1) | |
fc2 = tfp.layers.DenseFlipout(num_classes)(fc1) | |
return tf.keras.Model(inputs=input, outputs=fc2) | |
# Step 2 - Dataloader | |
def get_mnist_data(normalize=True): | |
img_rows, img_cols = 28, 28 | |
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data() | |
if tf.keras.backend.image_data_format() == 'channels_first': | |
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) | |
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) | |
input_shape = (1, img_rows, img_cols) | |
else: | |
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) | |
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) | |
input_shape = (img_rows, img_cols, 1) | |
x_train = x_train.astype('float32') | |
x_test = x_test.astype('float32') | |
if normalize: | |
x_train /= 255 | |
x_test /= 255 | |
return x_train, y_train, x_test, y_test, input_shape | |
def train_with_kl(): | |
# Hyper-parameters. | |
batch_size = 128 | |
num_classes = 10 | |
epochs = 2 | |
# Get the training data. | |
x_train, y_train, x_test, y_test, input_shape = get_mnist_data() | |
# Get the model. | |
model = get_bayesian_model_functionalapi(input_shape=input_shape, num_classes=num_classes) | |
# model.build(input_shape) # does not solve errors while eagerly executing | |
print ('\n- y_predict: {}\n'.format( model(tf.ones(( (batch_size,) + input_shape))).shape )) # solves errors if you wish to eagerly execute | |
def variational_free_energy_loss(model): | |
kl = sum(model.losses) | |
def loss(y_true, y_pred): | |
bce = tf.math.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred)) | |
loss_val = bce + kl | |
# tf.print(' - loss_val: ', loss_val) # will print during eager execution | |
return loss_val | |
return loss | |
# Prepare the model for training. | |
loss = variational_free_energy_loss(model) | |
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=loss, | |
metrics=['accuracy']) | |
# Train the model. | |
model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs, verbose=1) | |
# model.evaluate(x_test, y_test, verbose=0) | |
if __name__ == "__main__": | |
train_with_kl() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
OG Ref: https://github.com/tensorflow/probability/issues/620 | |
Goals | |
- To experiment with DNNs built using Flipout layers in both eager and non-eager mode | |
- Eager mode allows for debugging using tf.print() | |
- Non-eager mode is supposed to be faster and less memory consuming since its pre-computes functions in an efficient manner into a graph | |
- Models will be made using model subclassing and the dataset will use tf.data.Dataset | |
Notes | |
- initially, the code below only worked in non-eager mode | |
- adding weight initialization before model.fit() solves that issue and allows to debug in eager mode | |
Tested with TFlow 2.4.0 and TFlow prob 0.12.1 | |
""" | |
import os | |
import pdb | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
import numpy as np | |
import tensorflow as tf | |
import tensorflow_probability as tfp | |
if len(tf.config.list_physical_devices('GPU')):tf.config.experimental.set_memory_growth(tf.config.list_physical_devices('GPU')[0], True) | |
print (' - tflow ver : ', tf.__version__) | |
print (' - tflow-prob ver: ', tfp.__version__) | |
# tf.compat.v1.disable_eager_execution() | |
MODE_TRAIN = 'train' | |
MODE_TEST = 'test' | |
# Step 1 - Model | |
class BayesModel(tf.keras.Model): | |
def __init__(self, num_classes=10): | |
super(BayesModel, self).__init__(name='BayesModel') | |
self.model = tf.keras.Sequential(name='BayesModel') | |
self.model.add(tfp.layers.Convolution2DFlipout(6, kernel_size=5, padding="SAME", activation=tf.nn.relu, name='Conv2DFlip')) | |
self.model.add(tf.keras.layers.Flatten()) | |
self.model.add(tfp.layers.DenseFlipout(84, activation=tf.nn.relu, name='DenseFlip1')) | |
self.model.add(tfp.layers.DenseFlipout(num_classes, name='DenseFlip2')) | |
def call(self, x): | |
return self.model(x) | |
# Step 2 - Dataloader | |
class MNISTDataset: | |
def __init__(self, mode): | |
self.mode = mode | |
self.prep_data() | |
def __len__(self): | |
if self.mode == MODE_TRAIN: | |
return len(self.x_train) | |
elif self.mode == MODE_TEST: | |
return len(self.x_test) | |
def prep_data(self): | |
(self.x_train, self.y_train), (self.x_test, self.y_test) = tf.keras.datasets.mnist.load_data() | |
self.input_shape = self.x_train[0].shape + (1,) | |
def generator(self): | |
dataset = tf.data.Dataset.from_generator(self._generator | |
, output_signature=(tf.TensorSpec(shape=self.input_shape, dtype=tf.float32), tf.TensorSpec(shape=(), dtype=tf.float32)) | |
,args=()) | |
return dataset | |
def _generator(self): | |
for idx in range(len(self.x_train)): | |
if self.mode == MODE_TRAIN: | |
x = self.x_train[idx]/255 | |
y = self.y_train[idx] | |
elif self.mode == MODE_TEST: | |
x = self.x_test[idx]/255 | |
y = self.y_test[idx] | |
x = np.array(x).astype('float32') | |
yield (np.expand_dims(x,-1), y) | |
def train_with_kl(): | |
# Step 0 - Hyper-parameters. | |
batch_size = 128 | |
num_classes = 10 | |
epochs = 2 | |
# Step 1 - Get the training data. | |
dataset_train = MNISTDataset(mode=MODE_TRAIN) | |
datagenerator_train = dataset_train.generator().repeat().batch(batch_size) | |
# Step 2 - Get the model. | |
model = BayesModel(num_classes=num_classes) | |
print ('\n- y_predict: {}\n'.format( model(tf.ones(( (batch_size,) + dataset_train.input_shape))).shape )) # need to init weights with the exact batch size | |
def variational_free_energy_loss(model): | |
# for layer in model.layers: | |
# for loss_id, loss in enumerate(layer.losses): | |
# tf.print (' - ', layer.name, loss_id, loss) # will print during eager execution | |
kl = sum(model.losses) | |
def loss(y_true, y_pred): | |
bce = tf.math.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(y_true, y_pred, from_logits=True)) | |
return bce + 0.001*kl | |
return loss | |
# Prepare the model for training. | |
loss = variational_free_energy_loss(model) | |
model.compile(optimizer=tf.keras.optimizers.Adam(), loss=loss, | |
metrics=['accuracy']) | |
# Train the model. | |
model.fit(datagenerator_train, steps_per_epoch=len(dataset_train) // batch_size, epochs=epochs, verbose=1) | |
# model.evaluate(x_test, y_test, verbose=0) | |
if __name__ == "__main__": | |
train_with_kl() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment