Skip to content

Instantly share code, notes, and snippets.

@prerakmody
Last active November 2, 2022 11:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save prerakmody/1b6f349db1634846303fac27060805cc to your computer and use it in GitHub Desktop.
Save prerakmody/1b6f349db1634846303fac27060805cc to your computer and use it in GitHub Desktop.
Netron.app example to visualize a tensorflow 2.x model
"""
pip install tensorflow
pip install tf2onnx keras2onnx onnxmltools
"""
import os
import pdb
import json
import traceback
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import tensorflow as tf # v2.4
if len(tf.config.list_physical_devices('GPU')):
tf.config.experimental.set_memory_growth(tf.config.list_physical_devices('GPU')[0], True)
############################################################
# FOCUSNET #
############################################################
class ConvBlock3D(tf.keras.layers.Layer):
def __init__(self, filters, kernel_size=(3,3,3), strides=(1, 1, 1), padding='same'
, dilation_rate=(1,1,1)
, activation='relu'
, trainable=False
, dropout=None
, pool=False
, name=''):
super(ConvBlock3D, self).__init__(name='{}_ConvBlock3D'.format(name))
self.pool = pool
self.conv_layer = tf.keras.Sequential(name='{}_Sequential'.format(name))
for filter_id, filter_count in enumerate(filters):
self.conv_layer.add(
tf.keras.layers.Conv3D(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding
, dilation_rate=dilation_rate
, activation=activation
, kernel_regularizer=tf.keras.regularizers.l2(0.1)
, name='Conv_{}'.format(filter_id))
)
self.conv_layer.add(tf.keras.layers.BatchNormalization(trainable=trainable, name='BNorm_{}'.format(filter_id)))
if filter_id == 0 and dropout is not None:
self.conv_layer.add(tf.keras.layers.Dropout(rate=dropout, name='DropOut_{}'.format(filter_id)))
if self.pool:
self.pool_layer = tf.keras.layers.MaxPooling3D((2,2,2), strides=(2,2,2), name='{}_Pool'.format(name))
def call(self, x):
x = self.conv_layer(x)
if self.pool:
return x, self.pool_layer(x)
else:
return x
def get_config(self):
if self.pool:
return {'conv_layer': self.conv_layer, 'pool_layer': self.pool_layer}
else:
return {'conv_layer': self.conv_layer}
class ConvBlock3DSERes(tf.keras.layers.Layer):
"""
For channel-wise attention
"""
def __init__(self, filters, kernel_size=(3,3,3), strides=(1, 1, 1), padding='same'
, dilation_rate=(1,1,1)
, activation='relu'
, trainable=False
, dropout=None
, pool=False
, squeeze_ratio=None
, init=False
, name=''):
super(ConvBlock3DSERes, self).__init__(name='{}_ConvBlock3DSERes'.format(name))
self.init = init
if self.init:
self.convblock_filterequalizer = tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=(1,1,1), padding='same'
, activation='relu', name='{}_ConnvBlockInit'.format(name))
self.convblock_res = ConvBlock3D(filters=filters, kernel_size=kernel_size, strides=strides, padding=padding
, dilation_rate=dilation_rate
, activation=activation
, trainable=trainable
, dropout=dropout
, pool=False
, name=name
)
"""
Ref: https://github.com/imkhan2/se-resnet/blob/master/se_resnet.py
"""
self.squeeze_ratio = squeeze_ratio
if self.squeeze_ratio is not None:
self.seblock = tf.keras.Sequential(name='{}_SqueezeExcitation'.format(name))
self.seblock.add(tf.keras.layers.GlobalAveragePooling3D())
self.seblock.add(tf.keras.layers.Reshape(target_shape=(1,1,1,filters[0])))
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0]//squeeze_ratio, kernel_size=(1,1,1), strides=(1,1,1), padding='same'
, activation='relu'))
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=(1,1,1), padding='same'
, activation='sigmoid'))
self.pool = pool
if self.pool:
self.pool_layer = tf.keras.layers.MaxPooling3D((2,2,2), strides=(2,2,2), name='{}_Pool'.format(name))
def call(self, x):
if self.init:
x = self.convblock_filterequalizer(x)
x_res = self.convblock_res(x)
if self.squeeze_ratio is not None:
x_se = self.seblock(x_res) # squeeze and then get excitation factor
x_res = tf.math.multiply(x_res, x_se) # excited block
y = x + x_res
if self.pool:
return y, self.pool_layer(y)
else:
return y
def get_config(self):
config = {
'convblock_res': self.convblock_res
, 'seblock': self.seblock
}
if self.init:
config['convblock_filterequalizer'] = self.convblock_filterequalizer
if self.pool:
config['pool_layer'] = self.pool_layer
return config
class UpConvBlock3D(tf.keras.layers.Layer):
def __init__(self, filters, kernel_size=(2,2,2), strides=(2, 2, 2), padding='same', trainable=False, name=''):
super(UpConvBlock3D, self).__init__(name='{}_UpConv3D'.format(name))
self.upconv_layer = tf.keras.Sequential(name='{}_Sequential'.format(name))
self.upconv_layer.add(tf.keras.layers.Conv3DTranspose(filters, kernel_size, strides, padding=padding
, activation='relu'
, kernel_regularizer=tf.keras.regularizers.l2(0.1)
, name='UpConv_{}'.format(self.name))
)
# self.upconv_layer.add(tf.keras.layers.BatchNormalization(trainable=trainable))
def call(self, x):
return self.upconv_layer(x)
def get_config(self):
return {'upconv_layer': self.upconv_layer}
class ModelFocusNetZDil1(tf.keras.Model):
def __init__(self, class_count, activation='softmax', deepsup=False, trainable=False, verbose=False):
super(ModelFocusNetZDil1, self).__init__(name='ModelFocusNetZDil1')
self.verbose = verbose
self.deepsup = deepsup
dropout = [None, 0.25, 0.25, 0.25, 0.25, 0.25, None, None]
filters = [[10,10], [20,20]]
dilation_xy = [1, 2, 3, 6, 12, 18]
dilation_z = [1, 1, 1, 1, 1 , 1]
# Feat Extraction (SE-Res Blocks)
self.convblock1 = ConvBlock3DSERes(filters=filters[0], kernel_size=(3,3,1), dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[0], pool=True , squeeze_ratio=2, name='Block1') # Dim/2 (e.g. 96/2=48, 240/2=120)(rp=(3,5,10),(3,5,10))
self.convblock2 = ConvBlock3DSERes(filters=filters[0], dilation_rate=(dilation_xy[1], dilation_xy[1], dilation_z[1]), trainable=trainable, dropout=dropout[0], pool=False, squeeze_ratio=2, name='Block2') # Dim/2 (e.g. 96/2=48, 240/2=120)(rp=(14,18),(12,14))
# Dense ASPP
self.convblock3 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[2], dilation_xy[2], dilation_z[2]), trainable=trainable, dropout=dropout[1], pool=False, name='Block3_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(24,30),(16,18))
self.convblock4 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[3], dilation_xy[3], dilation_z[3]), trainable=trainable, dropout=dropout[2], pool=False, name='Block4_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(42,54),(20,22))
self.convblock5 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[4], dilation_xy[4], dilation_z[4]), trainable=trainable, dropout=dropout[3], pool=False, name='Block5_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(78,102),(24,26))
self.convblock6 = ConvBlock3D(filters=filters[1][:], dilation_rate=(dilation_xy[5], dilation_xy[5], dilation_z[5]), trainable=trainable, dropout=dropout[4], pool=False, name='Block6_ASPP') # Dim/2 (e.g. 96/2=48, 240/2=120) (rp=(138,176),(28,30))
self.convblock7 = ConvBlock3DSERes(filters=filters[1], dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[5], pool=False, squeeze_ratio=2, init=True, name='Block7') # Dim/2 (e.g. 96/2=48) (rp=(176,180),(32,44))
# Upstream
self.convblock8 = ConvBlock3DSERes(filters=filters[1], dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[6], pool=False, squeeze_ratio=2, init=True, name='Block8') # Dim/2 (e.g. 96/2=48)
if self.deepsup:
self.convblock8_1 = tf.keras.layers.Conv3D(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same'
, dilation_rate=(1,1,1)
, activation=activation
, name='Block8_1')
self.upconvblock9 = UpConvBlock3D(filters=filters[0][0], trainable=trainable, name='Block9_1') # Dim/1 (e.g. 96/1 = 96)
self.convblock9 = ConvBlock3DSERes(filters=filters[0], dilation_rate=(dilation_xy[0], dilation_xy[0], dilation_z[0]), trainable=trainable, dropout=dropout[7], pool=False, squeeze_ratio=2, init=True, name='Block9') # Dim/1 (e.g. 96/1 = 96)
# Final
self.convblock10 = tf.keras.layers.Conv3D(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same'
, dilation_rate=(1,1,1)
, activation=activation
, name='Block10')
def call(self, x):
# Feat Extraction (SE-Res Blocks)
conv1, pool1 = self.convblock1(x)
conv2 = self.convblock2(pool1)
# Dense ASPP
conv3 = self.convblock3(conv2)
conv3_op = tf.concat([conv2, conv3], axis=-1)
conv4 = self.convblock4(conv3_op)
conv4_op = tf.concat([conv3_op, conv4], axis=-1)
conv5 = self.convblock5(conv4_op)
conv5_op = tf.concat([conv4_op, conv5], axis=-1)
conv6 = self.convblock6(conv5_op)
conv6_op = tf.concat([conv5_op, conv6], axis=-1)
conv7 = self.convblock7(conv6_op)
# Upstream
# Pixel-wise attention can be added here
conv8 = self.convblock8(tf.concat([pool1, conv7], axis=-1))
if self.deepsup:
conv8_1 = self.convblock8_1(conv8)
up9 = self.upconvblock9(conv8)
# Pixel-wise attention can be added here
conv9 = self.convblock9(tf.concat([conv1, up9], axis=-1))
# Final
conv10 = self.convblock10(conv9)
if self.verbose:
print (' ---------- Model: ', self.name)
print (' - x: ', x.shape)
print (' - conv1: ', conv1.shape)
print (' - conv2: ', conv2.shape)
print (' - conv3_op: ', conv3_op.shape)
print (' - conv4_op: ', conv4_op.shape)
print (' - conv5_op: ', conv5_op.shape)
print (' - conv6_op: ', conv6_op.shape)
print (' - conv7: ', conv7.shape)
print (' - conv8: ', conv8.shape)
print (' - conv9: ', conv9.shape)
print (' - conv10: ', conv10.shape)
if self.deepsup:
return conv8_1, conv10
else:
return conv10
def build_graph(self, dim):
x = tf.keras.Input(shape=(dim), name='{}-Input'.format(self.name))
return tf.keras.Model(inputs=[x], outputs=self.call(x))
def get_config(self):
config = {
'convblock1': self.convblock1
, 'convblock2': self.convblock2
, 'convblock3': self.convblock3
, 'convblock4': self.convblock4
, 'convblock5': self.convblock5
, 'convblock6': self.convblock6
, 'convblock7': self.convblock7
, 'convblock8': self.convblock8
, 'convblock9': self.convblock9
, 'convblock10': self.convblock10
}
if self.deepsup:
config['convblock8_1'] = self.convblock8_1
return config
############################################################
# UTILS #
############################################################
@tf.function
def write_model_trace(model, X):
return model(X)
if __name__ == "__main__":
if 1:
print (' --------------------- [ModelFocusNetZDil1] --------------------- ')
# Using the summary() function
raw_shape = (140, 140, 40, 1)
model = ModelFocusNetZDil1(class_count=10)
model = model.build_graph(raw_shape)
model.summary(line_length=150)
# Using the save() function
_ = model(tf.ones((1,*raw_shape)))
model.save('ModelFocusNetZDil1') # Keras ModelSave format; unable to properly visualize .pb file in www.netron.app
model.save('ModelFocusNetZDil1.h5', save_format='h5') # Loads in www.netron.app for viz purposes, shows the high level blocks
# Using the .to_json() and .get_config() function
with open('ModelFocusNetZDil1.json', 'w') as fp:
json.dump(json.loads(model.to_json()), fp, indent=4)
# model.get_config()
# Using tf2onnx
import onnx
import tf2onnx
model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model, [tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx
model_onnx = onnx.shape_inference.infer_shapes(model_proto)
tf2onnx.utils.save_protobuf('ModelFocusNetZDil1_tf2onxx.onnx', model_onnx)
# Using tensorboard
tf.summary.trace_on(graph=True, profiler=False)
_ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32))
writer = tf.summary.create_file_writer(str('ModelFocusNetZDil1_TBoard'))
with writer.as_default():
tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None)
writer.flush()
print (' - Run command --> tensorboard --logdir=ModelFocusNetZDil1_TBoard --port=6100')
pdb.set_trace()
"""
pip install tensorflow
pip install tf2onnx
"""
import os
import pdb
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import tensorflow as tf # v2.4
if len(tf.config.list_physical_devices('GPU')):
tf.config.experimental.set_memory_growth(tf.config.list_physical_devices('GPU')[0], True)
############################################################
# INCEPTION MODEL #
############################################################
# Model Ref: https://towardsdatascience.com/model-sub-classing-and-custom-training-loop-from-scratch-in-tensorflow-2-cc1d4f10fb4e
class ConvModule(tf.keras.layers.Layer):
def __init__(self, kernel_num, kernel_size, strides, padding='same'):
super(ConvModule, self).__init__()
self.conv = tf.keras.layers.Conv2D(kernel_num, kernel_size=kernel_size, strides=strides, padding=padding)
self.bn = tf.keras.layers.BatchNormalization()
def call(self, input_tensor):
x = self.conv(input_tensor)
x = self.bn(x)
x = tf.nn.relu(x)
return x
def get_config(self):
return {'conv': self.conv, 'bn':self.bn}
class InceptionModule(tf.keras.layers.Layer):
def __init__(self, kernel_size1x1, kernel_size3x3, name=''):
super(InceptionModule, self).__init__('InceptionModule_{}'.format(name))
self.conv1 = ConvModule(kernel_size1x1, kernel_size=(1,1), strides=(1,1))
self.conv2 = ConvModule(kernel_size3x3, kernel_size=(3,3), strides=(1,1))
self.cat = tf.keras.layers.Concatenate()
def call(self, input_tensor):
x_1x1 = self.conv1(input_tensor)
x_3x3 = self.conv2(input_tensor)
x = self.cat([x_1x1, x_3x3])
return x
def get_config(self):
return {'conv1':self.conv1, 'conv2':self.conv2, 'cat':self.cat}
class DownsampleModule(tf.keras.layers.Layer):
def __init__(self, kernel_size, name=''):
super(DownsampleModule, self).__init__(name='DownsampleModule_{}'.format(name))
self.conv3 = ConvModule(kernel_size, kernel_size=(3,3), strides=(2,2), padding="valid")
self.pool = tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=(2,2))
self.cat = tf.keras.layers.Concatenate()
def call(self, input_tensor):
conv_x = self.conv3(input_tensor)
pool_x = self.pool(input_tensor)
return self.cat([conv_x, pool_x])
def get_config(self):
return {'conv3':self.conv3, 'pool':self.pool, 'cat':self.cat}
class MiniInception(tf.keras.Model):
def __init__(self, num_classes=10):
super(MiniInception, self).__init__(name='MiniInception')
# the first conv module
self.conv_block = ConvModule(96, (3,3), (1,1))
# 2 inception module and 1 downsample module
self.inception_block1 = InceptionModule(32, 32, name='Block1')
self.inception_block2 = InceptionModule(32, 48, name='Block2')
self.downsample_block1 = DownsampleModule(80, name='Block1')
# 4 inception module and 1 downsample module
self.inception_block3 = InceptionModule(112, 48, name='Block3')
self.inception_block4 = InceptionModule(96, 64, name='Block4')
self.inception_block5 = InceptionModule(80, 80, name='Block5')
self.inception_block6 = InceptionModule(48, 96, name='Block6')
self.downsample_block2 = DownsampleModule(96, name='Block2')
# 2 inception module
self.inception_block7 = InceptionModule(176, 160, name='Block7')
self.inception_block8 = InceptionModule(176, 160, name='Block8')
# average pooling
self.avg_pool = tf.keras.layers.AveragePooling2D((7,7))
# model tail
self.flat = tf.keras.layers.Flatten()
self.classfier = tf.keras.layers.Dense(num_classes, activation='softmax')
def call(self, input_tensor):
# forward pass
x = self.conv_block(input_tensor)
x = self.inception_block1(x)
x = self.inception_block2(x)
x = self.downsample_block1(x)
x = self.inception_block3(x)
x = self.inception_block4(x)
x = self.inception_block5(x)
x = self.inception_block6(x)
x = self.downsample_block2(x)
x = self.inception_block7(x)
x = self.inception_block8(x)
x = self.avg_pool(x)
x = self.flat(x)
return self.classfier(x)
def build_graph(self, raw_shape):
x = tf.keras.layers.Input(shape=raw_shape)
return tf.keras.Model(inputs=[x], outputs=self.call(x))
def get_config(self):
return {
'conv_block':self.conv_block
, 'inception_block1': self.inception_block1
, 'inception_block2': self.inception_block2
, 'downsample_block1': self.downsample_block1
, 'inception_block3':self.inception_block3
, 'inception_block4':self.inception_block4
, 'inception_block5':self.inception_block5
, 'inception_block6':self.inception_block6
, 'downsample_block2': self.downsample_block2
, 'inception_block7':self.inception_block7
, 'inception_block8':self.inception_block8
, 'avg_pool': self.avg_pool
, 'flat':self.flat
, 'classfier':self.classfier
}
############################################################
# UTILS #
############################################################
@tf.function
def write_model_trace(model, X):
return model(X)
if __name__ == "__main__":
if 1:
print (' --------------------- [MiniInception] --------------------- ')
# Using the summary() function
raw_shape = (32, 32, 3)
model = MiniInception() # tf.keras.Model
model = model.build_graph(raw_shape) # <class 'tensorflow.python.keras.engine.functional.Functional'>
model.summary(line_length=150)
# Using the save() function
_ = model(tf.ones((1,*raw_shape)))
model.save('MiniInception') # Keras ModelSave format; unable to properly visualize .pb file in www.netron.app
model.save('MiniInception.h5') # Loads in www.netron.app for viz purposes, shows the high level blocks
# Using the .to_json() and .get_config() function
with open('MiniInception.json', 'w') as fp:
json.dump(json.loads(model.to_json()), fp, indent=4)
# model.get_config()
# Using onnxmltools (does not support Conv3D, MaxPool3D, Conv3DTranspose)
import onnx
import onnxmltools
model_onnx = onnxmltools.convert_keras(model, model.name) # this is a ModelProto
model_onnx = onnx.shape_inference.infer_shapes(model_onnx)
onnxmltools.utils.save_model(model_onnx, 'MiniInception_onnxmltools.onnx')
# Using keras2onnx
import keras2onnx
model_onnx = keras2onnx.convert_keras(model, model.name)
model_onnx = onnx.shape_inference.infer_shapes(model_onnx)
keras2onnx.save_model(model_onnx, 'MiniInception_keras2onxx.onnx')
# Using tf2onnx
import tf2onnx
model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model, [tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx
model_onnx = onnx.shape_inference.infer_shapes(model_proto)
tf2onnx.utils.save_protobuf('MiniInception_tf2onxx.onnx', model_onnx)
# Using tensorboard
tf.summary.trace_on(graph=True, profiler=False)
_ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32))
writer = tf.summary.create_file_writer(str('MiniInception_TBoard'))
with writer.as_default():
tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None)
writer.flush()
print (' - Run command --> tensorboard --logdir=MiniInception_TBoard --port=6100')
# Import internal libraries
import src.config as config
# Import external libraries
import pdb
import time
import traceback
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_probability as tfp
print (' - tf : ', tf.__version__) # 2.9.1
print (' - tfa: ', tfa.__version__) # 0.17.1
print (' - tfp: ', tfp.__version__) # 0.17.0
############################################################
# 3D MODEL BLOCKS #
############################################################
class ConvBlock3DTrial(tf.keras.layers.Layer):
"""
Performs a series of 3D convolutions which are residual in nature
"""
def __init__(self, filters, dilation_rates, kernel_size=(3,3,3), strides=(1,1,1), padding='same'
, activation=tf.nn.relu
, group_factor=4
, trainable=False
, dropout=None
, residual=True
, init_filters=False
, bayesian=False
, spectral=False
, pool=None
, name=''):
super(ConvBlock3D, self).__init__(name='{}_ConvBlock3DTrial'.format(name))
# Step 0 - Init
self.init_filters = init_filters
self.pool = pool
self.residual = residual
if type(filters) == int:
filters = [filters]
if spectral:
f = lambda x: tfa.layers.SpectralNormalization(x)
else:
f = lambda x: x
# Step 1 - Set the filters right so that the residual can be done
if self.init_filters:
self.init_layer = tf.keras.Sequential(name='{}_Conv1x1Seq'.format(self.name))
if bayesian:
self.init_layer.add(
f(
tfp.layers.Convolution3DFlipout(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding
, dilation_rate=1
, activation=None
, name='{}_Conv1x1Flip'.format(self.name))
)
)
else:
self.init_layer.add(
f(
tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding
, dilation_rate=1
, activation=None
, name='{}_Conv1x1'.format(self.name))
)
)
self.init_layer.add(tfa.layers.GroupNormalization(groups=filters[0]//group_factor, trainable=trainable))
self.init_layer.add(tf.keras.layers.Activation(activation))
# Step 2 - Create residual block
self.conv_layer = tf.keras.Sequential(name='{}_ConvSeq'.format(self.name))
for filter_id, filter_count in enumerate(filters):
# Step 2.0 - DropOut or not
# print (' - name: {} || dropout: {}'.format(self.name, dropout) )
if dropout is not None:
self.conv_layer.add(tf.keras.layers.Dropout(rate=dropout, name='{}_DropOut_{}'.format(self.name, filter_id))) # before every conv layer (could also be after every layer?)
spectral = True
if spectral:
f = lambda x: tfa.layers.SpectralNormalization(x)
else:
f = lambda x: x
# Step 2.1 - Bayesian or not
if bayesian:
self.conv_layer.add(
f(
tfp.layers.Convolution3DFlipout(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding
, dilation_rate=dilation_rates[filter_id]
, activation=None
, name='{}_Conv3DFlip_{}'.format(self.name, filter_id))
)
)
else:
self.conv_layer.add(
f(
tf.keras.layers.Conv3D(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding
, dilation_rate=dilation_rates[filter_id]
, activation=None
, name='{}_Conv3D_{}'.format(self.name, filter_id))
)
)
self.conv_layer.add(tfa.layers.GroupNormalization(groups=filter_count//group_factor, trainable=trainable))
# Step 2.2 - Residual or not
if self.residual:
if filter_id != len(filters) -1: # dont add activation in the last conv of the block
self.conv_layer.add(tf.keras.layers.Activation(activation))
else:
self.conv_layer.add(tf.keras.layers.Activation(activation))
# Step 2.1 - Finish residual block
if self.residual:
self.activation_layer = tf.keras.layers.Activation(activation)
# Step 3 - Learnt Pooling
if self.pool is not None:
self.pool_layer = f(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=self.pool, strides=self.pool, padding=padding
, dilation_rate=(1,1,1)
, activation=None
, name='{}_Conv3DPooling'.format(self.name)))
def call(self, x):
# Step 1
if self.init_filters:
x = self.init_layer(x)
# Step 2
if self.residual:
x_ = self.conv_layer(x) # Conv-GN-ReLU -- Conv-GN
x = self.activation_layer(x_ + x) # RelU
else:
x = self.conv_layer(x)
# Step 3
if self.pool:
x_pool = self.pool_layer(x)
return x, x_pool
else:
return x
class ConvBlock3D(tf.keras.layers.Layer):
"""
Performs a series of 3D convolutions which are residual in nature
"""
def __init__(self, filters, dilation_rates, kernel_size=(3,3,3), strides=(1,1,1), padding='same'
, activation=tf.nn.relu
, group_factor=4
, trainable=False
, dropout=None
, residual=True
, init_filters=False
, bayesian=False
, spectral=False
, pool=None
, name=''):
super(ConvBlock3D, self).__init__(name='{}_ConvBlock3D'.format(name))
# Step 0 - Init
self.init_filters = init_filters
self.pool = pool
self.residual = residual
if type(filters) == int:
filters = [filters]
# Step 1 - Set the filters right so that the residual can be done
if self.init_filters:
self.init_layer = tf.keras.Sequential(name='{}_Conv1x1Seq'.format(self.name))
if bayesian:
self.init_layer.add(
#
tfp.layers.Convolution3DFlipout(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding
, dilation_rate=1
, activation=None
, name='{}_Conv1x1Flip'.format(self.name))
)
else:
self.init_layer.add(
tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=strides, padding=padding
, dilation_rate=1
, activation=None
, name='{}_Conv1x1'.format(self.name))
)
self.init_layer.add(tfa.layers.GroupNormalization(groups=filters[0]//group_factor, trainable=trainable))
self.init_layer.add(tf.keras.layers.Activation(activation))
# Step 2 - Create residual block
self.conv_layer = tf.keras.Sequential(name='{}_ConvSeq'.format(self.name))
for filter_id, filter_count in enumerate(filters):
# Step 2.0 - DropOut or not
# print (' - name: {} || dropout: {}'.format(self.name, dropout) )
if dropout is not None:
self.conv_layer.add(tf.keras.layers.Dropout(rate=dropout, name='{}_DropOut_{}'.format(self.name, filter_id))) # before every conv layer (could also be after every layer?)
# Step 2.1 - Bayesian or not
if bayesian:
self.conv_layer.add(
tfp.layers.Convolution3DFlipout(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding
, dilation_rate=dilation_rates[filter_id]
, activation=None
, name='{}_Conv3DFlip_{}'.format(self.name, filter_id))
)
else:
self.conv_layer.add(
tf.keras.layers.Conv3D(filters=filter_count, kernel_size=kernel_size, strides=strides, padding=padding
, dilation_rate=dilation_rates[filter_id]
, activation=None
, name='{}_Conv3D_{}'.format(self.name, filter_id))
)
self.conv_layer.add(tfa.layers.GroupNormalization(groups=filter_count//group_factor, trainable=trainable))
# Step 2.2 - Residual or not
if self.residual:
if filter_id != len(filters) -1: # dont add activation in the last conv of the block
self.conv_layer.add(tf.keras.layers.Activation(activation))
else:
self.conv_layer.add(tf.keras.layers.Activation(activation))
# Step 2.1 - Finish residual block
if self.residual:
self.activation_layer = tf.keras.layers.Activation(activation)
# Step 3 - Learnt Pooling
if self.pool is not None:
self.pool_layer = tf.keras.layers.Conv3D(filters=filters[0], kernel_size=self.pool, strides=self.pool, padding=padding
, dilation_rate=(1,1,1)
, activation=None
, name='{}_Conv3DPooling'.format(self.name))
def call(self, x):
# Step 1
if self.init_filters:
x = self.init_layer(x)
# Step 2
if self.residual:
x_ = self.conv_layer(x) # Conv-GN-ReLU -- Conv-GN
x = self.activation_layer(x_ + x) # RelU
else:
x = self.conv_layer(x)
# Step 3
if self.pool:
x_pool = self.pool_layer(x)
return x, x_pool
else:
return x
def get_config(self):
config = {}
if self.init_filters:
config['init_filters'] = self.init_layer
config['conv_layer'] = self.conv_layer
if self.residual:
config['residual'] = self.activation_layer
if self.pool:
config['pool'] = self.pool_layer
return config
class UpConvBlock(tf.keras.layers.Layer):
def __init__(self, filters, kernel_size=(2,2,2), strides=(2, 2, 2), padding='same', spectral=False, trainable=False, name=''):
super(UpConvBlock, self).__init__(name='{}_UpConvBlock'.format(name))
if spectral:
f = lambda x: tfa.layers.SpectralNormalization(x)
else:
f = lambda x: x
self.upconv_layer = tf.keras.Sequential()
self.upconv_layer.add(
f(
tf.keras.layers.Conv3DTranspose(filters=filters, kernel_size=kernel_size, strides=kernel_size, padding=padding
, activation=None
, name='{}__ConvTranspose'.format(name))
)
)
def call(self, x):
return self.upconv_layer(x)
def get_config(self):
return {'upconv_layer': self.upconv_layer}
class ConvBlockSERes(tf.keras.layers.Layer):
def __init__(self, filters, dilation_rates, kernel_size=(3,3,3), strides=(1, 1, 1), padding='same'
, activation=tf.nn.relu
, group_factor=4
, trainable=False
, dropout=None
, init_filters=False
, bayesian=False
, spectral=False
, pool=None
, squeeze_ratio=2
, name=''):
super(ConvBlockSERes, self).__init__(name='{}_ConvSERes'.format(name))
# Step 0 - Init
self.pool = pool
if spectral:
f = lambda x: tfa.layers.SpectralNormalization(x)
else:
f = lambda x: x
# Step 1 - ConvBlock
assert len(filters) == len(dilation_rates) # eg. len([32,32,32]) = len([(1,1,1), (3,3,1), (5,5,1)])
self.convblock_res = ConvBlock3D(filters=filters, dilation_rates=dilation_rates, kernel_size=kernel_size, strides=strides, padding=padding
, activation=activation
, group_factor=group_factor
, trainable=trainable
, dropout=dropout
, init_filters=init_filters
, bayesian=bayesian
, pool=None
, name='{}'.format(self.name)
)
# Step 2 - Squeeze and Excitation
## Ref: https://github.com/imkhan2/se-resnet/blob/master/se_resnet.py
self.seblock = tf.keras.Sequential(name='{}_SERes'.format(name))
self.seblock.add(tf.keras.layers.GlobalAveragePooling3D())
self.seblock.add(tf.keras.layers.Reshape(target_shape=(1,1,1,filters[0])))
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0]//squeeze_ratio, kernel_size=(1,1,1), strides=(1,1,1), padding='same'
, activation=tf.nn.relu))
self.seblock.add(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=(1,1,1), strides=(1,1,1), padding='same'
, activation=tf.nn.sigmoid))
# Step 3 - Pooling
# Step 3 - Learnt Pooling
if self.pool is not None:
self.pool_layer = f(tf.keras.layers.Conv3D(filters=filters[0], kernel_size=self.pool, strides=self.pool, padding=padding
, dilation_rate=(1,1,1)
, activation=None
, name='{}_Conv3DPooling'.format(self.name)))
def call(self, x):
# Step 1 - Conv Block
x_res = self.convblock_res(x)
# Step 2.1 - Squeeze and Excitation
x_se = self.seblock(x_res) # squeeze and then get excitation factor
# Step 2.2
y = x_res + tf.math.multiply(x_res, x_se) # excited block
# Step 3 - Pooling
if self.pool is not None:
return y, self.pool_layer(y)
else:
return y
def get_config(self):
config = {'convblock_res': self.convblock_res, 'seblock': self.seblock}
if self.pool:
config['pool_layer'] = self.pool_layer
return config
############################################################
# 3D BACKENDS #
############################################################
class FocusNetBackend(tf.keras.layers.Layer):
"""
Folows "FocusNet: Imbalanced Large and Small Organ Segmentation with an End-to-End Deep Neural Network for Head and Neck CT Images"
"""
def __init__(self, filters, dil_rates, trainable=False, verbose=False):
super(FocusNetBackend, self).__init__(name='FocusNetBackend')
self.convblock1 = ConvBlock3D(filters=filters[0] , dilation_rates=dil_rates[0], init_filters=True , bayesian=False, trainable=trainable, pool=(2,2,2), name='Block1')
self.convblock2 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True , bayesian=False, trainable=trainable, pool=None , name='Block2')
self.convblock3 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False, bayesian=False, trainable=trainable, pool=None , name='Block3')
def call(self, x):
conv1, pool1 = self.convblock1(x)
conv2 = pool2 = self.convblock2(pool1)
conv3 = self.convblock3(pool2)
return conv1, pool1, conv2, pool2, conv3
class OrganNetBackend(tf.keras.layers.Layer):
"""
Folows "A Novel Hybrid Convolutional Neural Network for Accurate Organ Segmentation in 3D Head and Neck CT Images"
"""
def __init__(self, filters, dil_rates, pooling='double', trainable=False, verbose=True):
super(OrganNetBackend, self).__init__(name='OrganNetBackend')
self.verbose = verbose
self.pooling = pooling
self.convblock1 = ConvBlock3D(filters=filters[0] , dilation_rates=dil_rates[0], init_filters=True , bayesian=False, trainable=trainable, pool=(2,2,1), name='Block1')
self.convblock2 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True , bayesian=False, trainable=trainable, pool=(2,2,2), name='Block2')
if pooling == 'double':
self.convblock3 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False , bayesian=False, trainable=trainable, pool=None , name='Block3NoPool')
elif pooling == 'triple':
self.convblock3 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False , bayesian=False, trainable=trainable, pool=(2,2,2) , name='Block3YesPool')
def call(self, x):
conv1, pool1 = self.convblock1(x)
conv2, pool2 = self.convblock2(pool1)
if self.pooling == 'double':
conv3 = self.convblock3(pool2)
pool3 = None
elif self.pooling == 'triple':
conv3, pool3 = self.convblock3(pool2)
if self.verbose:
print (' - [OrganNetBackend] x: ', x.shape)
print (' - [OrganNetBackend] conv1: {} | pool1: {}'.format(conv1.shape, pool1.shape))
print (' - [OrganNetBackend] conv2: {} | pool2: {}'.format(conv2.shape, pool2.shape))
if self.pooling == 'double':
print (' - [OrganNetBackend] conv3: {} | pool3: {}'.format(conv3.shape, pool3))
elif self.pooling == 'triple':
print (' - [OrganNetBackend] conv3: {} | pool3: {}'.format(conv3.shape, pool3.shape))
return conv1, pool1, conv2, pool2, conv3, pool3
def get_config(self):
config = {'convblock1': self.convblock1, 'convblock2': self.convblock2, 'convblock3': self.convblock3}
return config
def build_graph(self, dim):
x = tf.keras.Input(shape=(None,), name='{}-Input'.format(self.name))
return tf.keras.Model(inputs=[x], outputs=self.call(x))
class HDC(tf.keras.layers.Layer):
"""
Ref: Understanding Convolutions for Semantic Segmentation (https://arxiv.org/abs/1702.08502)
: https://gist.github.com/prerakmody/ac04e3ee4ee67cf66a4e6251d673993c
"""
def __init__(self, filters, dil_rates, dropout=None, bayesian=False, trainable=False, verbose=False):
super(HDC, self).__init__(name='HDC')
self.verbose = verbose
self.convblock4 = ConvBlockSERes(filters=filters[0], init_filters=False, dilation_rates=dil_rates[0], dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block4')
self.convblock5 = ConvBlockSERes(filters=filters[1], init_filters=False, dilation_rates=dil_rates[1], dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block5')
self.convblock6 = ConvBlockSERes(filters=filters[2], init_filters=False, dilation_rates=dil_rates[2], dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block6')
self.convblock7 = lambda x: x
def call(self, x):
conv4 = self.convblock4(x)
conv5 = self.convblock5(conv4)
conv6 = self.convblock6(conv5)
conv7 = self.convblock7(conv6)
if self.verbose:
print (' - [HDC] conv4: ', conv4.shape)
print (' - [HDC] conv5: ', conv5.shape)
print (' - [HDC] conv6: ', conv6.shape)
print (' - [HDC] conv7: ', conv7.shape)
return conv4, conv5, conv6, conv7
def get_config(self):
config = {'convblock4': self.convblock4, 'convblock5': self.convblock5, 'convblock6': self.convblock6, 'convblock7': self.convblock7}
return config
class nonHDC(tf.keras.layers.Layer):
"""
Ref: Understanding Convolutions for Semantic Segmentation (https://arxiv.org/abs/1702.08502)
: https://gist.github.com/prerakmody/ac04e3ee4ee67cf66a4e6251d673993c
"""
def __init__(self, filters, dil_rates, dropout=None, bayesian=False, trainable=False, verbose=False):
super(nonHDC, self).__init__(name='nonHDC')
self.convblock4 = ConvBlockSERes(filters=filters[0], dilation_rates=dil_rates[0], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block4')
self.convblock5 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block5')
self.convblock6 = ConvBlockSERes(filters=filters[2], dilation_rates=dil_rates[2], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block6')
self.convblock7 = ConvBlockSERes(filters=filters[3], dilation_rates=dil_rates[3], init_filters=False, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block7')
def call(self, x):
conv4 = self.convblock4(x)
conv5 = self.convblock5(conv4)
conv6 = self.convblock6(conv5)
conv7 = self.convblock7(conv6)
return conv4, conv5, conv6, conv7
class commonHead(tf.keras.layers.Layer):
def __init__(self, filters, pooling, dil_rates, filters_upsample, ksize_upsample, class_count, deepsup, dropout=None, bayesian=False, activation=tf.nn.softmax, trainable=False, verbose=False):
super(commonHead, self).__init__(name='commonHead')
# Step 0 - Init
self.deepsup = deepsup
self.pooling = pooling
if self.pooling == 'triple':
self.upconvblock8 = UpConvBlock(filters=filters_upsample[0], kernel_size=ksize_upsample[0], name='UpBlock8')
self.convblock8 = ConvBlockSERes(filters=filters[0], dilation_rates=dil_rates[0], init_filters=True, trainable=trainable, pool=None, name='Block8')
if filters_upsample[0] is not None:
self.upconvblock9 = UpConvBlock(filters=filters_upsample[0], kernel_size=ksize_upsample[0], name='UpBlock9')
else:
self.upconvblock9 = lambda x: x
if filters_upsample[1] is not None:
self.upconvblock10 = UpConvBlock(filters=filters_upsample[1], kernel_size=ksize_upsample[1], name='UpBlock10')
else:
self.upconvblock10 = lambda x: x
if not bayesian and dropout is None:
print (' - [models2.py][commonHead] bayesian: ', bayesian, ' || dropout: ', dropout)
print (' - [models2.py][commonHead] filters: ', filters)
self.convblock9 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True, trainable=trainable, pool=None, name='Block9')
self.convblock10 = ConvBlockSERes(filters=filters[2], dilation_rates=dil_rates[2], init_filters=True, trainable=trainable, pool=None, name='Block10')
self.convblock11 = tf.keras.layers.Conv3D(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same'
, dilation_rate=(1,1,1)
, activation=activation
, name='Block11'
)
else:
self.convblock9 = ConvBlockSERes(filters=filters[1], dilation_rates=dil_rates[1], init_filters=True, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block9-Flip')
self.convblock10 = ConvBlockSERes(filters=filters[2], dilation_rates=dil_rates[2], init_filters=True, dropout=dropout, bayesian=bayesian, trainable=trainable, pool=None, name='Block10-Flip')
self.convblock11 = tfp.layers.Convolution3DFlipout(filters=class_count, strides=(1,1,1), kernel_size=(1,1,1), padding='same'
, dilation_rate=(1,1,1)
, activation=activation
, name='Block11-Flip'
)
def call(self, conv7, conv3, conv2, conv1):
if self.pooling == 'triple':
upconv8 = self.upconvblock8(conv7)
conv8 = self.convblock8(tf.concat([conv3, upconv8], axis=-1))
else:
conv8 = self.convblock8(tf.concat([conv3, conv7], axis=-1))
upconv9 = self.upconvblock9(conv8)
conv9 = self.convblock9(tf.concat([conv2, upconv9], axis=-1))
upconv10 = self.upconvblock10(conv9)
conv10 = self.convblock10(tf.concat([conv1, upconv10], axis=-1))
conv11 = self.convblock11(conv10)
return conv8, conv9, conv10, conv11
def get_config(self):
config = {}
if self.pooling == 'triple':
config['upconvblock8'] = self.upconvblock8
config['convblock8'] = self.convblock8
config['upconvblock9'] = self.upconvblock9
config['convblock9'] = self.convblock9
config['upconvblock10'] = self.upconvblock10
config['convblock10'] = self.convblock10
config['convblock11'] = self.convblock11
return config
############################################################
# 3D MODELS #
############################################################
class OrganNet(tf.keras.Model):
def __init__(self, class_count, pooling='double', hdc=True, dropout=None, bayesian=True, deepsup=False, bayesianhead=False, activation=tf.nn.softmax, trainable=False, verbose=False):
super(OrganNet, self).__init__(name='OrganNet')
self.pooling = pooling
if 0:
backend_filters = [[16,16], [32,32]]
backend_dil_rates = [[(1,1,1),(1,1,1)], [(1,1,1),(1,1,1)]]
hdc_core_filters = [[32,32,32,32], [32,32,32,32], [32,32,32,32]]
hdc_core_dil_rates = [[(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)]]
nonhdc_core_filters = [[32,32,32], [32,32,32], [32,32,32], [32,32,32]]
nonhdc_core_dil_rates = [[(2,2,1), (2,2,1), (2,2,1)], [(3,3,1), (3,3,1), (3,3,1)], [(6,6,1), (6,6,1), (6,6,1)], [(12,12,1), (12,12,1), (12,12,1)]]
if class_count != 1:
head_filters = [[32,32], [16,16], [class_count, class_count]]
else:
head_filters = [[32,32], [16,16], [8, 8]]
head_dil_rates = [[(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)]]
head_filters_upsample = [32,16]
head_filters_ksize = [(2,2,2), (2,2,1)]
elif 1:
backend_filters = [[32,32], [48,48]]
backend_dil_rates = [[(1,1,1),(1,1,1)], [(1,1,1),(1,1,1)]]
hdc_core_filters = [[48,48,48,48], [48,48,48,48], [48,48,48,48]]
hdc_core_dil_rates = [[(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)]]
nonhdc_core_filters = [[48,48,48], [48,48,48], [48,48,48], [48,48,48]]
nonhdc_core_dil_rates = [[(2,2,1), (2,2,1), (2,2,1)], [(3,3,1), (3,3,1), (3,3,1)], [(6,6,1), (6,6,1), (6,6,1)], [(12,12,1), (12,12,1), (12,12,1)]]
if class_count != 1:
head_filters = [[48,48], [32,32], [class_count, class_count]]
else:
head_filters = [[48,48], [32,32], [8, 8]]
head_dil_rates = [[(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)]]
head_filters_upsample = [48,48]
head_filters_ksize = [(2,2,2), (2,2,1)]
print (' - [models2.py][OrganNet] pooling : ', pooling)
print (' - [models2.py][OrganNet] bayesian : ', bayesian)
print (' - [models2.py][OrganNet] dropout : ', dropout)
print (' - [models2.py][OrganNet] bayesianhead : ', bayesianhead)
self.backend = OrganNetBackend(filters=backend_filters, dil_rates=backend_dil_rates, pooling=pooling, trainable=trainable, verbose=verbose)
if not bayesianhead:
if hdc:
self.core = HDC(filters=hdc_core_filters, dil_rates=hdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose)
else:
self.core = nonHDC(filters=nonhdc_core_filters, dil_rates=nonhdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose)
self.head = commonHead(filters=head_filters, pooling=pooling, dil_rates=head_dil_rates, filters_upsample=head_filters_upsample, ksize_upsample=head_filters_ksize, class_count=class_count, deepsup=deepsup
, activation=activation
, trainable=trainable, verbose=verbose)
else:
if hdc:
self.core = HDC(filters=hdc_core_filters, dil_rates=hdc_core_dil_rates, dropout=None, bayesian=False, trainable=trainable, verbose=verbose)
else:
self.core = nonHDC(filters=nonhdc_core_filters, dil_rates=nonhdc_core_dil_rates, dropout=None, bayesian=False, trainable=trainable, verbose=verbose)
self.head = commonHead(filters=head_filters, pooling=pooling, dil_rates=head_dil_rates, filters_upsample=head_filters_upsample, ksize_upsample=head_filters_ksize, class_count=class_count, deepsup=deepsup
, dropout=dropout, bayesian=bayesian
, activation=activation
, trainable=trainable, verbose=verbose)
def call(self, x):
if 1:
# Step 1 - Backend
conv1, pool1, conv2, pool2, conv3, pool3 = self.backend(x)
# Step 2 - Core
if self.pooling == 'double':
conv4, conv5, conv6, conv7 = self.core(conv3)
elif self.pooling == 'triple':
conv4, conv5, conv6, conv7 = self.core(pool3)
# Step 3 - Head
conv8, conv9, conv10, conv11 = self.head(conv7, conv3, conv2, conv1)
return conv11
else:
# Step 1 - Backend
conv1, pool1 = self.backend.convblock1(x)
conv2, pool2 = self.backend.convblock2(pool1)
if self.backend.pooling == 'double':
conv3 = self.backend.convblock3(pool2)
pool3 = None
elif self.backend.pooling == 'triple':
conv3, pool3 = self.backend.convblock3(pool2)
# Step 2 - Core
if self.pooling == 'double':
conv4 = self.core.convblock4(conv3)
conv5 = self.core.convblock5(conv4)
conv6 = self.core.convblock6(conv5)
conv7 = self.core.convblock7(conv6)
# conv4, conv5, conv6, conv7 = self.core(conv3)
elif self.pooling == 'triple':
conv4 = self.core.convblock4(pool3)
conv5 = self.core.convblock5(conv4)
conv6 = self.core.convblock6(conv5)
conv7 = self.core.convblock7(conv6)
# conv4, conv5, conv6, conv7 = self.core(pool3)
# Step 3 - Head
if self.head.pooling == 'triple':
upconv8 = self.head.upconvblock8(conv7)
conv8 = self.head.convblock8(tf.concat([conv3, upconv8], axis=-1))
else:
conv8 = self.head.convblock8(tf.concat([conv3, conv7], axis=-1))
upconv9 = self.head.upconvblock9(conv8)
conv9 = self.head.convblock9(tf.concat([conv2, upconv9], axis=-1))
upconv10 = self.head.upconvblock10(conv9)
conv10 = self.head.convblock10(tf.concat([conv1, upconv10], axis=-1))
conv11 = self.head.convblock11(conv10)
# conv8, conv9, conv10, conv11 = self.head(conv7, conv3, conv2, conv1)
return conv11
def get_config(self):
config = {'backend': self.backend, 'core': self.core, 'head': self.head}
return config
def build_graph(self, dim):
x = tf.keras.Input(shape=(dim), name='{}-Input'.format(self.name))
return tf.keras.Model(inputs=[x], outputs=self.call(x))
class FocusNet(tf.keras.Model):
def __init__(self, class_count, hdc=False, dropout=None, bayesian=True, deepsup=False, trainable=False, verbose=False):
super(FocusNet, self).__init__(name='FocusNet')
if 1:
backend_filters = [[16,16], [32,32]]
backend_dil_rates = [[(1,1,1),(1,1,1)], [(1,1,1),(1,1,1)]]
hdc_core_filters = [[32,32,32,32], [32,32,32,32], [32,32,32,32]]
hdc_core_dil_rates = [[(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)], [(1,1,1),(3,3,1),(5,5,1),(9,9,1)]]
nonhdc_core_filters = [[32,32,32], [32,32,32], [32,32,32], [32,32,32]]
nonhdc_core_dil_rates = [[(2,2,1), (2,2,1), (2,2,1)], [(3,3,1), (3,3,1), (3,3,1)], [(6,6,1), (6,6,1), (6,6,1)], [(12,12,1), (12,12,1), (12,12,1)]]
head_filters = [[32,32], [16,16], [class_count, class_count]]
head_dil_rates = [[(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)], [(1,1,1), (1,1,1)]]
head_filters_upsample = [None,16]
head_filters_ksize = [None, (2,2,2)]
print (' - [models2.py][FocusNet] bayesian: ', bayesian)
self.backend = FocusNetBackend(filters=backend_filters, dil_rates=backend_dil_rates, trainable=trainable, verbose=verbose)
if hdc:
self.core = HDC(filters=hdc_core_filters, dil_rates=hdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose)
else:
self.core = nonHDC(filters=nonhdc_core_filters, dil_rates=nonhdc_core_dil_rates, dropout=dropout, bayesian=bayesian, trainable=trainable, verbose=verbose)
self.head = commonHead(filters=head_filters, dil_rates=head_dil_rates, filters_upsample=head_filters_upsample, ksize_upsample=head_filters_ksize, class_count=class_count, deepsup=deepsup, trainable=trainable, verbose=verbose)
def call(self, x):
# Step - Backend
conv1, pool1, conv2, pool2, conv3 = self.backend(x)
# Step 2 - Core
conv4, conv5, conv6, conv7 = self.core(conv3)
# Step 3 - Head
conv8, conv9, conv10, conv11 = self.head(conv7, conv3, conv2, conv1)
return conv11
def get_config(self):
config = {'backend': self.backend, 'core': self.core, 'head': self.head}
return config
def build_graph(self, dim):
x = tf.keras.Input(shape=(dim), name='{}-Input'.format(self.name))
return tf.keras.Model(inputs=[x], outputs=self.call(x))
############################################################
# UTILS #
############################################################
@tf.function
def write_model_trace(model, X):
return model(X)
############################################################
# MAIN #
############################################################
if __name__ == "__main__":
X = tf.random.normal((2,140,140,40,1))
if 0:
print ('\n ------------------- FocusNet ------------------- ')
model = FocusNet(class_count=10)
y_predict = model(X, training=True)
print (' - y_predict: ', y_predict.shape)
model.summary() # ~ nonBayes(550K), Bayes(900K) params
print (model.losses)
elif 0:
print ('\n ------------------- OrganNet ------------------- ')
model = OrganNet(class_count=10)
y_predict = model(X, training=True)
print (' - y_predict: ', y_predict.shape)
model.summary() # ~ nonBayes(550K), Bayes(900K) params
print (model.losses)
elif 0:
print ('\n ------------------- OrganNet (bayesian=False, dropout=0.3) ------------------- ')
model = OrganNet(class_count=10, bayesian=False, dropout=0.3)
y_predict = model(X, training=True)
print (' - y_predict: ', y_predict.shape)
model.summary() # ~ nonBayes(550K), Bayes(900K) params
print (model.losses)
# OrganNet (for prostate)
elif 0:
X = tf.random.normal((2,200,200,28,1))
print ('\n ------------------- OrganNet (class_count=1, bayesian=False, dropout=None, activation=tf.nn.sigmoid) ------------------- ')
model = OrganNet(class_count=1, hdc=True, bayesian=False, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True)
y_predict = model(X, training=True)
print (' - y_predict: ', y_predict.shape)
model.summary() # ~ nonBayes(550K), Bayes(900K) params
print (model.losses)
# OrganNet(pooling=triple) (for prostate)
elif 0:
X = tf.random.normal((2,200,200,28,1))
print ('\n ------------------- OrganNet (class_count=1, bayesian=False, dropout=None, activation=tf.nn.sigmoid) ------------------- ')
model = OrganNet(class_count=1, pooling='triple', hdc=True, bayesian=False, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True)
y_predict = model(X, training=True)
print (' - y_predict: ', y_predict.shape)
model.summary() # ~ nonBayes(550K), Bayes(900K) params
print (model.losses)
# OrganNet(pooling=triple) (for prostate) and for visualization in netron.app
elif 1:
raw_shape = (200,200,28,1)
print ('\n ------------------- OrganNet (class_count=1, pooling=triple, hdc=True, bayesian=False, dropout=None, activation=tf.nn.sigmoid) ------------------- ')
model = OrganNet(class_count=1, pooling='triple', hdc=True, bayesian=False, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True)
model = model.build_graph(raw_shape)
model.summary(line_length=150)
# Using the save() function
_ = model(tf.ones((1,*raw_shape)))
model.save('OrganNetPool3.h5', save_format='h5') # Loads in www.netron.app for viz purposes, shows the high level blocks
# model.save('OrganNetPool3', save_format='tf')
# Using the .to_json() function
# import json
# with open('OrganNetPool3-Functional.json', 'w') as fp:
# json.dump(json.loads(model.to_json()), fp, indent=4)
# Using the .get_config() function
# import json
# with open('OrganNetPool3.json', 'w') as fp:
# json.dump(json.loads(model.get_config()), fp, indent=4)
tf.keras.utils.plot_model(model, 'OrganNetPool3.png', show_shapes=True, expand_nested=True) # pip install pydot graphviz
# tf.keras.utils.model_to_dot(model, show_shapes=True, expand_nested=True, subgraph=True)
# # Using tf2onnx
# import onnx
# import tf2onnx
# model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model=model, input_signature=[tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx
# model_onnx = onnx.shape_inference.infer_shapes(model_proto)
# tf2onnx.utils.save_protobuf('OrganNetPool3.onnx', model_onnx)
# # Using tensorboard
# tf.summary.trace_on(graph=True, profiler=False)
# _ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32))
# writer = tf.summary.create_file_writer(str('OrganNetPool3'))
# with writer.as_default():
# tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None)
# writer.flush()
# print (' - Run command --> tensorboard --logdir=OrganNetPool3 --port=6100')
pass
# OrganNet(pooling=triple) (for prostate) and for visualization in netron.app
elif 0:
raw_shape = (200,200,28,1)
print ('\n ------------------- OrganNet (class_count=1, pooling=triple, hdc=True, bayesian=True, dropout=None, activation=tf.nn.sigmoid) ------------------- ')
model = OrganNet(class_count=1, pooling='triple', hdc=True, bayesian=True, bayesianhead=False, dropout=None, activation=tf.nn.sigmoid, verbose=True)
model = model.build_graph(raw_shape)
model.summary(line_length=150)
# Using the save() function
_ = model(tf.ones((1,*raw_shape)))
model.save('OrganNetBayesPool3.h5', save_format='h5') # Loads in www.netron.app for viz purposes, shows the high level blocks
# model.save('OrganNetBayesPool3.h5', save_format='tf')
# Using tf2onnx
# import onnx
# import tf2onnx
# model_proto, external_tensor_storage = tf2onnx.convert.from_keras(model=model, input_signature=[tf.TensorSpec((1,*raw_shape), tf.float32, name='ModelInput')]) # tensorspec ensures that the batch size shows up properly in tf2onnx
# model_onnx = onnx.shape_inference.infer_shapes(model_proto)
# tf2onnx.utils.save_protobuf('OrganNetBayesPool3.onnx', model_onnx)
# Using tensorboard
# tf.summary.trace_on(graph=True, profiler=False)
# _ = write_model_trace(model, tf.ones(shape=(1,*raw_shape), dtype=tf.float32))
# writer = tf.summary.create_file_writer(str('OrganNetBayesPool3'))
# with writer.as_default():
# tf.summary.trace_export(name=model.name, step=0, profiler_outdir=None)
# writer.flush()
# print (' - Run command --> tensorboard --logdir=OrganNetBayesPool3 --port=6100')
pass
# OrgaNetBayesianHead
elif 0:
print ('\n ------------------- OrganNet (bayesian=True, dropout=None, bayesianhead=True) ------------------- ')
model = OrganNet(class_count=10, bayesian=True, dropout=None, bayesianhead=True)
y_predict = model(X, training=True)
print (' - y_predict: ', y_predict.shape)
model.summary() # ~ nonBayes(550K), and this is 580K params
print (model.losses)
pdb.set_trace()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment