Skip to content

Instantly share code, notes, and snippets.

@dyd1234
Created July 16, 2024 11:28
Show Gist options
  • Save dyd1234/e2fbcdb7388c1c2e02d8407901bad7b1 to your computer and use it in GitHub Desktop.
Save dyd1234/e2fbcdb7388c1c2e02d8407901bad7b1 to your computer and use it in GitHub Desktop.
Darknet -53 and CSPDarknet-53 SCN with different implementations
import math
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchviz import make_dot
#-------------------------------------------------#
# MISH Activation Function
#-------------------------------------------------#
# class Mish(nn.Module): # Already implemented in PyTorch
# def __init__(self):
# super(Mish, self).__init__()
# def forward(self, x):
# return x * torch.tanh(F.softplus(x))
#---------------------------------------------------#
# Convolutional Block -> Convolution + Normalization + Activation Function
# Conv2d + BatchNormalization + Mish
#---------------------------------------------------#
class BasicConv(nn.Module): # Basic Convolution
def __init__(self, in_channels, out_channels, kernel_size, stride=1):
super(BasicConv, self).__init__()
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size // 2, bias=False)
self.bn = nn.BatchNorm2d(out_channels)
self.activation = nn.Mish()
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.activation(x)
return x
#---------------------------------------------------#
# CSPDarkNet building block components
# Internally stacked residual blocks
#---------------------------------------------------#
class Resblock(nn.Module):
def __init__(self, channels, hidden_channels=None):
super(Resblock, self).__init__()
if hidden_channels is None:
hidden_channels = channels
self.block = nn.Sequential(
BasicConv(channels, hidden_channels, 1),
BasicConv(hidden_channels, channels, 3)
)
def forward(self, x):
return x + self.block(x)
#--------------------------------------------------------------------#
# CSPDarkNet building block
# First use ZeroPadding2D and a stride of 2x2 convolution block to compress height and width
# Then establish a large residual side shortconv, which bypasses many residual structures
# The main part will loop through num_blocks, and the internal loop consists of residual structures
# For the entire CSPDarkNet building block, it is a large residual block + multiple internal small residual blocks
#--------------------------------------------------------------------#
class Resblock_body(nn.Module):
def __init__(self, in_channels, out_channels, num_blocks, first):
super(Resblock_body, self).__init__()
#----------------------------------------------------------------#
# Use a stride of 2x2 convolution block to compress height and width
#----------------------------------------------------------------#
self.downsample_conv = BasicConv(in_channels, out_channels, 3, stride=2)
if first:
#--------------------------------------------------------------------------#
# Then establish a large residual side self.split_conv0, which bypasses many residual structures
#--------------------------------------------------------------------------#
self.split_conv0 = BasicConv(out_channels, out_channels, 1)
#----------------------------------------------------------------#
# The main part will loop through num_blocks, and the internal loop consists of residual structures
#----------------------------------------------------------------#
self.split_conv1 = BasicConv(out_channels, out_channels, 1)
self.blocks_conv = nn.Sequential(
Resblock(channels=out_channels, hidden_channels=out_channels // 2),
BasicConv(out_channels, out_channels, 1)
)
self.concat_conv = BasicConv(out_channels * 2, out_channels, 1)
else:
#--------------------------------------------------------------------------#
# Then establish a large residual side self.split_conv0, which bypasses many residual structures
#--------------------------------------------------------------------------#
self.split_conv0 = BasicConv(out_channels, out_channels // 2, 1)
#----------------------------------------------------------------#
# The main part will loop through num_blocks, and the internal loop consists of residual structures
#----------------------------------------------------------------#
self.split_conv1 = BasicConv(out_channels, out_channels // 2, 1)
self.blocks_conv = nn.Sequential(
*[Resblock(out_channels // 2) for _ in range(num_blocks)],
BasicConv(out_channels // 2, out_channels // 2, 1)
)
self.concat_conv = BasicConv(out_channels, out_channels, 1)
def forward(self, x):
x = self.downsample_conv(x)
x0 = self.split_conv0(x)
x1 = self.split_conv1(x)
x1 = self.blocks_conv(x1)
#------------------------------------#
# Stack the large residual side back
#------------------------------------#
x = torch.cat([x1, x0], dim=1)
#------------------------------------#
# Finally integrate the number of channels
#------------------------------------#
x = self.concat_conv(x)
return x
#---------------------------------------------------#
# CSPDarkNet53 main body
# Input is a 416x416x3 image
# Outputs are three effective feature layers
#---------------------------------------------------#
class CSPDarkNet(nn.Module): # modify the part
def __init__(self, layers):
super(CSPDarkNet, self).__init__()
self.inplanes = 32
# 416,416,3 -> 416,416,32
self.conv1 = BasicConv(3, self.inplanes, kernel_size=3, stride=1)
self.feature_channels = [64, 128, 256, 512, 1024]
self.stages = nn.ModuleList([
# 416,416,32 -> 208,208,64
Resblock_body(self.inplanes, self.feature_channels[0], layers[0], first=True),
# 208,208,64 -> 104,104,128
Resblock_body(self.feature_channels[0], self.feature_channels[1], layers[1], first=False),
# 104,104,128 -> 52,52,256
Resblock_body(self.feature_channels[1], self.feature_channels[2], layers[2], first=False),
# 52,52,256 -> 26,26,512
Resblock_body(self.feature_channels[2], self.feature_channels[3], layers[3], first=False),
# 26,26,512 -> 13,13,1024
Resblock_body(self.feature_channels[3], self.feature_channels[4], layers[4], first=False)
])
self.num_features = 1
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
def forward(self, x):
x = self.conv1(x)
x = self.stages[0](x)
x = self.stages[1](x)
out3 = self.stages[2](x)
out4 = self.stages[3](out3)
out5 = self.stages[4](out4)
return out3, out4, out5
def darknet53(pretrained):
model = CSPDarkNet([1, 2, 8, 8, 4]) # The same number as before
if pretrained:
model.load_state_dict(torch.load("model_data/CSPdarknet53_backbone_weights.pth"))
return model
model = darknet53(pretrained=False)
input_tensor = torch.randn(1, 3, 416, 416)
# out3, out4, out5 = model(input_tensor)
_, _, out5 = model(input_tensor)
make_dot((out5), params=dict(model.named_parameters())).render("cspdarknet53", format="png")
### SCN functions
### Have some torch imports above
from typing import List, Tuple
def create_param_combination_conv2d(dimensions: int, in_channels: int, out_channels: int, kernel_size: int = 3) -> nn.ParameterList:
"""
This function is used to create a weight tensor list for a single conv2d layer without biases.
The weight tensors are meant to be used for calculating the final weight of the layer via linear combination.
"""
weight_list = nn.ParameterList()
for _ in range(dimensions):
weight = Parameter(torch.empty((out_channels, in_channels, kernel_size, kernel_size)))
init.kaiming_uniform_(weight, a=math.sqrt(5)) # to initialize the weights
weight_list.append(weight)
return weight_list
def create_param_combination_linear(dimensions: int, in_features: int, out_features: int) -> Tuple[nn.ParameterList, nn.ParameterList]:
"""
This function is used to create a weight tensor list for a single linear layer with biases.
The weight tensors are meant to be used for calculating the final weight of the layer via linear combination.
"""
weight_list = nn.ParameterList()
bias_list = nn.ParameterList()
for _ in range(dimensions):
weight = Parameter(torch.empty((out_features, in_features)))
init.kaiming_uniform_(weight, a=math.sqrt(5))
weight_list.append(weight)
bias = Parameter(torch.empty(out_features))
fan_in, _ = init._calculate_fan_in_and_fan_out(weight)
bound = 1 / math.sqrt(fan_in)
init.uniform_(bias, -bound, bound)
bias_list.append(bias)
return weight_list, bias_list
def calculate_weighted_sum(param_list: List[Parameter], coefficients: torch.Tensor) -> torch.Tensor:
"""
Calculate the weighted sum (linear combination) which is the final weight used during inference.
"""
weighted_list = [a * b for a, b in zip(param_list, coefficients)]
return torch.sum(torch.stack(weighted_list), dim=0)
def execute_hyper_conv2d(x: torch.Tensor, weight_list: List[Parameter], coefficients: torch.Tensor, stride: int = 0, padding: int = 0) -> torch.Tensor:
"""
Execute one hyper-conv2d layer.
"""
weights = calculate_weighted_sum(weight_list, coefficients)
return F.conv2d(x, weight=weights, stride=stride, padding=padding)
def execute_hyper_linear(x: torch.Tensor, weight_list: List[Parameter], bias_list: List[Parameter], coefficients: torch.Tensor) -> torch.Tensor:
"""
Execute one hyper-linear layer.
"""
weights = calculate_weighted_sum(weight_list, coefficients)
biases = calculate_weighted_sum(bias_list, coefficients)
return F.linear(x, weight=weights, bias=biases)
# Add some other parts to freeze weights
# Do it later
### define the CSP darknet 53 with hypernets
import math
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
# import torchvision.transforms.functional as TF
import random
import torchvision.transforms.functional as TF
import math
from torchviz import make_dot
#-------------------------------------------------#
# MISH Activation Function
#-------------------------------------------------#
# class Mish(nn.Module): # Already implemented in PyTorch
# def __init__(self):
# super(Mish, self).__init__()
# def forward(self, x):
# return x * torch.tanh(F.softplus(x))
#---------------------------------------------------#
# Convolutional Block -> Convolution + Normalization + Activation Function
# Conv2d + BatchNormalization + Mish
#---------------------------------------------------#
class BasicConv_SCN(nn.Module): # Basic Convolution
def __init__(self, dimensions, in_channels, out_channels, kernel_size, stride=1, is_scn=False):
super(BasicConv_SCN, self).__init__()
# set some class vaeiables
# self.dimensions = dimensions #
self.is_scn = is_scn # very useful
self.kernel_size = kernel_size # yes, the padding will be kernel_size//2 eaiser with this version
self.stride = stride
# set other steps
if self.is_scn: #
self.conv_weight_list = create_param_combination_conv2d(dimensions, in_channels, out_channels, kernel_size=self.kernel_size)
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size // 2, bias=False)
self.bn = nn.BatchNorm2d(out_channels)
self.activation = nn.Mish()
else:
self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, kernel_size // 2, bias=False)
self.bn = nn.BatchNorm2d(out_channels)
self.activation = nn.Mish()
def forward(self, x, hyper_x): # hyperx = Beta, no dimension needed
if self.is_scn:
# x = self.conv(x)
x = execute_hyper_conv2d(x, self.conv_weight_list, hyper_x, stride=self.stride, padding=self.kernel_size//2) # must be right
else:
x = self.conv(x)
x = self.bn(x)
x = self.activation(x)
return x
#---------------------------------------------------#
# Modify later
# FullyConnected Block -> Convolution + Normalization + Activation Function
# Conv2d + BatchNormalization + Mish
#---------------------------------------------------#
class OutputLayer_SCN(nn.Module):
def __init__(self, dimensions, feature_in, num_classes : int, is_scn=False):
super(OutputLayer_SCN, self).__init__()
self.is_scn = is_scn # very useful
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
# x = torch.flatten(x, 1) # Flatten the output of avgpool
if self.is_scn:
self.fc_weight_list, self.linear_bias_list = create_param_combination_linear(dimensions, feature_in, num_classes)
self.fc = nn.Linear(feature_in, num_classes) #
else:
self.fc = nn.Linear(feature_in, num_classes) #
def forward(self, x, hyper_x): #
x = self.avgpool(x)
x = torch.flatten(x, 1)
if self.is_scn:
x = execute_hyper_linear(x, self.fc_weight_list, self.linear_bias_list, hyper_x) # the real code for using the SCN with other layers
else:
x = self.fc(x)
return x
#---------------------------------------------------#
# CSPDarkNet building block components
# Internally stacked residual blocks
#---------------------------------------------------#
class Resblock_SCN(nn.Module): # OK
def __init__(self, dimensions, channels, hidden_channels=None, is_scn=False):
super(Resblock_SCN, self).__init__() #
if hidden_channels is None:
hidden_channels = channels
# Only SCN the basic
# do not use nn.sequential, the implementation is not working for that
# self.block = nn.Sequential( # the resblock, similar to The YOLO v3
# # BasicConv(dimensions, channels, hidden_channels, 1),
# # BasicConv(hidden_channels, channels, 3)
# BasicConv_SCN(dimensions, channels, hidden_channels, 1, is_scn=is_scn),
# BasicConv_SCN(dimensions, hidden_channels, channels, 3, is_scn=is_scn)
# )
# Only SCN the basic
self.conv1 = BasicConv_SCN(dimensions, channels, hidden_channels, 1, is_scn=is_scn)
self.conv2 = BasicConv_SCN(dimensions, hidden_channels, channels, 3, is_scn=is_scn)
def forward(self, x, hyper_x): #
# return x + self.block(x, hyper_x)
out = self.conv1(x, hyper_x)
out = self.conv2(out, hyper_x)
return x + out
#--------------------------------------------------------------------#
# CSPDarkNet building block
# First use ZeroPadding2D and a stride of 2x2 convolution block to compress height and width
# Then establish a large residual side shortconv, which bypasses many residual structures
# The main part will loop through num_blocks, and the internal loop consists of residual structures
# For the entire CSPDarkNet building block, it is a large residual block + multiple internal small residual blocks
#--------------------------------------------------------------------#
class Resblock_body_SCN(nn.Module):
def __init__(self, dimensions, in_channels, out_channels, num_blocks, first, is_scn=False):
super(Resblock_body_SCN, self).__init__()
#----------------------------------------------------------------#
# Use a stride of 2x2 convolution block to compress height and width
#----------------------------------------------------------------#
self.downsample_conv = BasicConv_SCN(dimensions, in_channels, out_channels, 3, stride=2, is_scn=is_scn)
if first: # the first part
#--------------------------------------------------------------------------#
# Then establish a large residual side self.split_conv0, which bypasses many residual structures
#--------------------------------------------------------------------------#
self.split_conv0 = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn)
#----------------------------------------------------------------#
# The main part will loop through num_blocks, and the internal loop consists of residual structures
#----------------------------------------------------------------#
self.split_conv1 = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn)
self.resblock = Resblock_SCN(dimensions, out_channels, hidden_channels=out_channels // 2, is_scn=is_scn)
self.basic_conv = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn)
self.concat_conv = BasicConv_SCN(dimensions, out_channels * 2, out_channels, 1, is_scn=is_scn)
else:
#--------------------------------------------------------------------------#
# Then establish a large residual side self.split_conv0, which bypasses many residual structures
#--------------------------------------------------------------------------#
self.split_conv0 = BasicConv_SCN(dimensions, out_channels, out_channels // 2, 1, is_scn=is_scn) #
#----------------------------------------------------------------#
# The main part will loop through num_blocks, and the internal loop consists of residual structures
#----------------------------------------------------------------#
self.split_conv1 = BasicConv_SCN(dimensions, out_channels, out_channels // 2, 1, is_scn=is_scn)
# should be OK I guess
self.resblocks = nn.ModuleList([Resblock_SCN(dimensions, out_channels // 2, is_scn=is_scn) for _ in range(num_blocks)])
self.basic_conv = BasicConv_SCN(dimensions, out_channels // 2, out_channels // 2, 1, is_scn=is_scn)
self.concat_conv = BasicConv_SCN(dimensions, out_channels, out_channels, 1, is_scn=is_scn)
def forward(self, x, hyper_x):
x = self.downsample_conv(x, hyper_x)
x0 = self.split_conv0(x, hyper_x)
x1 = self.split_conv1(x, hyper_x)
if hasattr(self, 'resblock'): # if first block, judge if we have the resblock OK
x1 = self.resblock(x1, hyper_x) # Add the following stuff
x1 = self.basic_conv(x1, hyper_x)
else: # if not first block
for resblock in self.resblocks:
x1 = resblock(x1, hyper_x) # Add the following stuff
x1 = self.basic_conv(x1, hyper_x)
#------------------------------------#
# Stack the large residual side back
#------------------------------------#
x = torch.cat([x1, x0], dim=1)
#------------------------------------#
# Finally integrate the number of channels
#------------------------------------#
x = self.concat_conv(x, hyper_x)
return x
#---------------------------------------------------#
# CSPDarkNet53 main body
# Input is a 416x416x3 image
# Outputs are three effective feature layers
#---------------------------------------------------#
class CSPDarkNet_SCN(nn.Module):
def __init__(self, layers, SCN_layers, num_classes, dimensions=1):
super(CSPDarkNet_SCN, self).__init__()
self.inplanes = 32
self.hyper_stack = nn.Sequential( # hypernet
nn.Linear(2, 64),
nn.ReLU(),
nn.Linear(64, dimensions),
nn.Softmax(dim=0)
)
# 416,416,3 -> 416,416,32
self.conv1 = BasicConv_SCN(dimensions, 3, self.inplanes, kernel_size=3, stride=1, is_scn=SCN_layers[0])
self.feature_channels = [64, 128, 256, 512, 1024]
self.stages = nn.ModuleList([
# 416,416,32 -> 208,208,64
Resblock_body_SCN(dimensions, self.inplanes, self.feature_channels[0], layers[0], first=True, is_scn=SCN_layers[1]),
# 208,208,64 -> 104,104,128
Resblock_body_SCN(dimensions, self.feature_channels[0], self.feature_channels[1], layers[1], first=False, is_scn=SCN_layers[2]),
# 104,104,128 -> 52,52,256
Resblock_body_SCN(dimensions, self.feature_channels[1], self.feature_channels[2], layers[2], first=False, is_scn=SCN_layers[3]),
# 52,52,256 -> 26,26,512
Resblock_body_SCN(dimensions, self.feature_channels[2], self.feature_channels[3], layers[3], first=False, is_scn=SCN_layers[4]),
# 26,26,512 -> 13,13,1024
Resblock_body_SCN(dimensions, self.feature_channels[3], self.feature_channels[4], layers[4], first=False, is_scn=SCN_layers[5])
])
# and another fc layer
self.fc_block = OutputLayer_SCN(dimensions, self.feature_channels[4], num_classes=num_classes, is_scn=SCN_layers[6])
## add remaing functions
## check if this is going to work
self.num_features = 1
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
def forward(self, x, hyper_x):
hyper_output = self.hyper_stack(hyper_x)
x = self.conv1(x, hyper_output)
x = self.stages[0](x, hyper_output)
x = self.stages[1](x, hyper_output)
out3 = self.stages[2](x, hyper_output)
out4 = self.stages[3](out3, hyper_output)
out5 = self.stages[4](out4, hyper_output)
out6 = self.fc_block(out5, hyper_output)
# return out3, out4, out5, out6
return out6
def cspdarknet53(pretrained):
scn_segment = [True, True, True, True, True, True, True] # still 7 parts
model = CSPDarkNet_SCN([1, 2, 8, 8, 4], scn_segment, num_classes=class_num) # The same number as before
if pretrained:
model.load_state_dict(torch.load("model_data/CSPdarknet53_backbone_weights.pth"))
return model
def transform_angle(angle): # why dont we have this in the SCN class? because you use the video info?
cos = math.cos(angle / 180 * math.pi)
sin = math.sin(angle / 180 * math.pi)
return torch.Tensor([cos, sin])
model = cspdarknet53(pretrained=False)
input_tensor = torch.randn(1, 3, 416, 416)
angle = random.uniform(0, 360) # uniformly set an angle from 0-2*pi
hyper_inputs = transform_angle(angle) # rotated image with random angle
# out3, out4, out5 = model(input_tensor)
# _, _, _, out6 = model(input_tensor, hyper_inputs)
out6 = model(input_tensor, hyper_inputs)
make_dot((out6), params=dict(model.named_parameters())).render("cspdarknet53_SCN", format="png")
import math
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional, List, Tuple, Union
from torch import nn, Tensor
from torch.nn.parameter import Parameter, UninitializedParameter
from torch.nn import init
# DarkNet-53 model
class DarkNet53_SCN2(nn.Module): #
def __init__(self, scn_list : List , num_classes, dimensions=1):
super(DarkNet53_SCN2, self).__init__()
self.dimensions = dimensions # set how many dimensions to make
self.inplanes = 32
self.num_classes = num_classes #
self.hyper_stack = nn.Sequential( # hypernet
nn.Linear(2, 64),
nn.ReLU(),
nn.Linear(64, dimensions),
nn.Softmax(dim=0)
)
self.scn_list = scn_list # set which layers (or blocks) should use SCN architecture
# Initial convolution layer
self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False)
self.conv0_weight_list = self.create_param_combination_conv2d(3, 32, kernel_size=3)
self.bn1 = nn.BatchNorm2d(32)
self.relu1 = nn.LeakyReLU(0.1)
# First layer 2 basic blocks
self.ds_conv1 = nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1, bias=False)
self.ds_conv1_weight_list = self.create_param_combination_conv2d(32, 64, kernel_size=3)
self.ds_bn1 = nn.BatchNorm2d(64)
self.ds_relu1 = nn.LeakyReLU(0.1)
self.residual1_0_conv1 = nn.Conv2d(64, 32, kernel_size=1, stride=1, padding=0, bias=False)
self.residual1_0_conv1_weight_list = self.create_param_combination_conv2d(64, 32, kernel_size=1)
self.residual1_0_bn1 = nn.BatchNorm2d(32)
self.residual1_0_relu1 = nn.LeakyReLU(0.1)
self.residual1_0_conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.residual1_0_conv2_weight_list = self.create_param_combination_conv2d(32, 64, kernel_size=3)
self.residual1_0_bn2 = nn.BatchNorm2d(64)
self.residual1_0_relu2 = nn.LeakyReLU(0.1)
# Second layer
self.ds_conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1, bias=False)
self.ds_conv2_weight_list = self.create_param_combination_conv2d(64, 128, kernel_size=3)
self.ds_bn2 = nn.BatchNorm2d(128)
self.ds_relu2 = nn.LeakyReLU(0.1)
self.residual2_0_conv1 = nn.Conv2d(128, 64, kernel_size=1, stride=1, padding=0, bias=False)
self.residual2_0_conv1_weight_list = self.create_param_combination_conv2d(128, 64, kernel_size=1)
self.residual2_0_bn1 = nn.BatchNorm2d(64)
self.residual2_0_relu1 = nn.LeakyReLU(0.1)
self.residual2_0_conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False)
self.residual2_0_conv2_weight_list = self.create_param_combination_conv2d(64, 128, kernel_size=3)
self.residual2_0_bn2 = nn.BatchNorm2d(128)
self.residual2_0_relu2 = nn.LeakyReLU(0.1)
self.residual2_1_conv1 = nn.Conv2d(128, 64, kernel_size=1, stride=1, padding=0, bias=False)
self.residual2_1_conv1_weight_list = self.create_param_combination_conv2d(128, 64, kernel_size=1)
self.residual2_1_bn1 = nn.BatchNorm2d(64)
self.residual2_1_relu1 = nn.LeakyReLU(0.1)
self.residual2_1_conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False)
self.residual2_1_conv2_weight_list = self.create_param_combination_conv2d(64, 128, kernel_size=3)
self.residual2_1_bn2 = nn.BatchNorm2d(128)
self.residual2_1_relu2 = nn.LeakyReLU(0.1)
# Third layer
self.ds_conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=2, padding=1, bias=False)
self.ds_conv3_weight_list = self.create_param_combination_conv2d(128, 256, kernel_size=3)
self.ds_bn3 = nn.BatchNorm2d(256)
self.ds_relu3 = nn.LeakyReLU(0.1)
self.residual3_0_conv1 = nn.Conv2d(256, 128, kernel_size=1, stride=1, padding=0, bias=False)
self.residual3_0_conv1_weight_list = self.create_param_combination_conv2d(256, 128, kernel_size=1)
self.residual3_0_bn1 = nn.BatchNorm2d(128)
self.residual3_0_relu1 = nn.LeakyReLU(0.1)
self.residual3_0_conv2 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False)
self.residual3_0_conv2_weight_list = self.create_param_combination_conv2d(128, 256, kernel_size=3)
self.residual3_0_bn2 = nn.BatchNorm2d(256)
self.residual3_0_relu2 = nn.LeakyReLU(0.1)
for i in range(1, 8): # is this OK? I will modify this
setattr(self, f"residual3_{i}_conv1", nn.Conv2d(256, 128, kernel_size=1, stride=1, padding=0, bias=False))
setattr(self, f"residual3_{i}_conv1_weight_list", self.create_param_combination_conv2d(256, 128, kernel_size=1))
setattr(self, f"residual3_{i}_bn1", nn.BatchNorm2d(128))
setattr(self, f"residual3_{i}_relu1", nn.LeakyReLU(0.1))
setattr(self, f"residual3_{i}_conv2", nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False))
setattr(self, f"residual3_{i}_conv2_weight_list",self.create_param_combination_conv2d(128, 256, kernel_size=3))
setattr(self, f"residual3_{i}_bn2", nn.BatchNorm2d(256))
setattr(self, f"residual3_{i}_relu2", nn.LeakyReLU(0.1))
# Fourth layer
self.ds_conv4 = nn.Conv2d(256, 512, kernel_size=3, stride=2, padding=1, bias=False)
self.ds_conv4_weight_list = self.create_param_combination_conv2d(256, 512, kernel_size=3)
self.ds_bn4 = nn.BatchNorm2d(512)
self.ds_relu4 = nn.LeakyReLU(0.1)
self.residual4_0_conv1 = nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0, bias=False)
self.residual4_0_conv1_weight_list = self.create_param_combination_conv2d(512, 256, kernel_size=1)
self.residual4_0_bn1 = nn.BatchNorm2d(256)
self.residual4_0_relu1 = nn.LeakyReLU(0.1)
self.residual4_0_conv2 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=False)
self.residual4_0_conv2_weight_list = self.create_param_combination_conv2d(256, 512, kernel_size=3)
self.residual4_0_bn2 = nn.BatchNorm2d(512)
self.residual4_0_relu2 = nn.LeakyReLU(0.1)
for i in range(1, 8): # Only a depper implementation
setattr(self, f"residual4_{i}_conv1", nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0, bias=False))
setattr(self, f"residual4_{i}_conv1_weight_list", self.create_param_combination_conv2d(512, 256, kernel_size=1))
setattr(self, f"residual4_{i}_bn1", nn.BatchNorm2d(256))
setattr(self, f"residual4_{i}_relu1", nn.LeakyReLU(0.1))
setattr(self, f"residual4_{i}_conv2", nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1, bias=False))
setattr(self, f"residual4_{i}_conv2_weight_list", self.create_param_combination_conv2d(256, 512, kernel_size=3))
setattr(self, f"residual4_{i}_bn2", nn.BatchNorm2d(512))
setattr(self, f"residual4_{i}_relu2", nn.LeakyReLU(0.1))
# Fifth layer
self.ds_conv5 = nn.Conv2d(512, 1024, kernel_size=3, stride=2, padding=1, bias=False)
self.ds_conv5_weight_list = self.create_param_combination_conv2d(512, 1024, kernel_size=3)
self.ds_bn5 = nn.BatchNorm2d(1024)
self.ds_relu5 = nn.LeakyReLU(0.1)
self.residual5_0_conv1 = nn.Conv2d(1024, 512, kernel_size=1, stride=1, padding=0, bias=False)
self.residual5_0_conv1_weight_list = self.create_param_combination_conv2d(1024, 512, kernel_size=1)
self.residual5_0_bn1 = nn.BatchNorm2d(512)
self.residual5_0_relu1 = nn.LeakyReLU(0.1)
self.residual5_0_conv2 = nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1, bias=False)
self.residual5_0_conv2_weight_list = self.create_param_combination_conv2d(512, 1024, kernel_size=3)
self.residual5_0_bn2 = nn.BatchNorm2d(1024)
self.residual5_0_relu2 = nn.LeakyReLU(0.1)
for i in range(1, 4):
setattr(self, f"residual5_{i}_conv1", nn.Conv2d(1024, 512, kernel_size=1, stride=1, padding=0, bias=False))
setattr(self, f"residual5_{i}_conv1_weight_list", self.create_param_combination_conv2d(1024, 512, kernel_size=1))
setattr(self, f"residual5_{i}_bn1", nn.BatchNorm2d(512))
setattr(self, f"residual5_{i}_relu1", nn.LeakyReLU(0.1))
setattr(self, f"residual5_{i}_conv2", nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1, bias=False))
setattr(self, f"residual5_{i}_conv2_weight_list", self.create_param_combination_conv2d(512, 1024, kernel_size=3))
setattr(self, f"residual5_{i}_bn2", nn.BatchNorm2d(1024)) #
setattr(self, f"residual5_{i}_relu2", nn.LeakyReLU(0.1))
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(1024, self.num_classes)
self.fc_weight_list, self.linear_bias_list = self.create_param_combination_linear(1024, self.num_classes)
self.smax = nn.Softmax(dim=1) # defined but not used
# dont do this, it is stupid!
# self.parameter_list = [ # arrange a way to do partial SCN
# ]
def create_param_combination_conv2d(self, in_channels, out_channels, kernel_size=3):
"""
This function is used to create weight tensor list for a single conv2d layer without biases.
The weight tensors are meant to be used for calculate the final weight of the layer via linear combination
"""
weight_list = nn.ParameterList()
bias_list = nn.ParameterList()
for _ in range(self.dimensions):
weight = Parameter(torch.empty((out_channels, in_channels, kernel_size, kernel_size)))
init.kaiming_uniform_(weight, a=math.sqrt(5)) # to initialize the stuff
weight_list.append(weight)
# bias = Parameter(torch.empty(out_channels))
# fan_in, _ = init._calculate_fan_in_and_fan_out(weight)
# bound = 1 / math.sqrt(fan_in)
# init.uniform_(bias, -bound, bound)
# bias_list.append(bias)
return weight_list
def create_param_combination_linear(self, in_features, out_features):
"""
This function is used to create weight tensor list for a single linear layer with biases.
The weight tensors are meant to be used for calculate the final weight of the layer via linear combination
"""
weight_list = nn.ParameterList()
bias_list = nn.ParameterList()
for _ in range(self.dimensions):
weight = Parameter(torch.empty((out_features, in_features)))
init.kaiming_uniform_(weight, a=math.sqrt(5))
weight_list.append(weight)
bias = Parameter(torch.empty(out_features))
fan_in, _ = init._calculate_fan_in_and_fan_out(weight)
bound = 1 / math.sqrt(fan_in)
init.uniform_(bias, -bound, bound)
bias_list.append(bias)
return weight_list, bias_list
def calculate_weighted_sum(self, param_list: List, coefficients: Tensor):
"""
Calculate the weighted sum (linear combination) which is the final weight used during inference
"""
weighted_list = [a * b for a, b in zip(param_list, coefficients)]
return torch.sum(torch.stack(weighted_list), dim=0)
def execute_hyper_conv2d(self, x, weight_list: List, coefficients: Tensor, stride=0, padding=0):
"""
Execute one hyper-conv2d layer
"""
weights = self.calculate_weighted_sum(weight_list, coefficients)
return F.conv2d(x, weight=weights, stride=stride, padding=padding)
def execute_hyper_linear(self, x, weight_list: List, bias_list: List, coefficients):
"""
Execute one hyper-linear layer
"""
weights = self.calculate_weighted_sum(weight_list, coefficients)
biases = self.calculate_weighted_sum(bias_list, coefficients)
return F.linear(x, weight=weights, bias=biases)
def forward(self, x, hyper_x): # 先全部SCN了,再看看部分SCN的情况
hyper_output = self.hyper_stack(hyper_x)
# Initial convolution layer
if self.scn_list[0] == 0: # use one4all
x = self.conv1(x) # do not use SCN
else: # use SCN
x = self.execute_hyper_conv2d(x, self.conv0_weight_list, hyper_output, stride=1, padding=1)
x = self.bn1(x)
x = self.relu1(x)
# First layer
if self.scn_list[1] == 0: # use one4all
x = self.ds_conv1(x)
x = self.ds_bn1(x)
x = self.ds_relu1(x)
residual = x
out = self.residual1_0_conv1(x)
out = self.residual1_0_bn1(out)
out = self.residual1_0_relu1(out)
out = self.residual1_0_conv2(out)
out = self.residual1_0_bn2(out)
out = self.residual1_0_relu2(out)
x = out + residual
else: # use SCN
# x = self.execute_hyper_conv2d(x, self.ds_conv1_weight_list, hyper_output, stride=2, padding=1)
x = self.ds_conv1(x)
x = self.ds_bn1(x)
x = self.ds_relu1(x)
residual = x
# out = self.residual1_0_conv1(x)
out = self.execute_hyper_conv2d(x, self.residual1_0_conv1_weight_list, hyper_output, stride=1, padding=0)
out = self.residual1_0_bn1(out)
out = self.residual1_0_relu1(out)
# out = self.residual1_0_conv2(out)
out = self.execute_hyper_conv2d(out, self.residual1_0_conv2_weight_list, hyper_output, stride=1, padding=1)
out = self.residual1_0_bn2(out)
out = self.residual1_0_relu2(out)
x = out + residual
# Second layer
if self.scn_list[2] == 0: # use one4all
x = self.ds_conv2(x)
x = self.ds_bn2(x)
x = self.ds_relu2(x)
for i in range(2):
residual = x
out = getattr(self, f"residual2_{i}_conv1")(x)
out = getattr(self, f"residual2_{i}_bn1")(out)
out = getattr(self, f"residual2_{i}_relu1")(out)
out = getattr(self, f"residual2_{i}_conv2")(out)
out = getattr(self, f"residual2_{i}_bn2")(out)
out = getattr(self, f"residual2_{i}_relu2")(out)
x = out + residual
else:
# x = self.ds_conv2(x)
x = self.execute_hyper_conv2d(x, self.ds_conv2_weight_list, hyper_output, stride=2, padding=1)
x = self.ds_bn2(x)
x = self.ds_relu2(x)
for i in range(2):
residual = x
# out = getattr(self, f"residual2_{i}_conv1")(x)
out = self.execute_hyper_conv2d(x, getattr(self, f"residual2_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0)
out = getattr(self, f"residual2_{i}_bn1")(out)
out = getattr(self, f"residual2_{i}_relu1")(out)
# out = getattr(self, f"residual2_{i}_conv2")(out)
out = self.execute_hyper_conv2d(out, getattr(self, f"residual2_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1)
out = getattr(self, f"residual2_{i}_bn2")(out)
out = getattr(self, f"residual2_{i}_relu2")(out)
x = out + residual
# Third layer
if self.scn_list[3] == 0:
x = self.ds_conv3(x)
x = self.ds_bn3(x)
x = self.ds_relu3(x)
for i in range(8):
residual = x
out = getattr(self, f"residual3_{i}_conv1")(x)
out = getattr(self, f"residual3_{i}_bn1")(out)
out = getattr(self, f"residual3_{i}_relu1")(out)
out = getattr(self, f"residual3_{i}_conv2")(out)
out = getattr(self, f"residual3_{i}_bn2")(out)
out = getattr(self, f"residual3_{i}_relu2")(out)
x = out + residual
else:
# x = self.ds_conv3(x)
x = self.execute_hyper_conv2d(x, self.ds_conv3_weight_list, hyper_output, stride=2, padding=1)
x = self.ds_bn3(x)
x = self.ds_relu3(x)
for i in range(8):
residual = x
# out = getattr(self, f"residual3_{i}_conv1")(x)
out = self.execute_hyper_conv2d(x, getattr(self, f"residual3_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0)
out = getattr(self, f"residual3_{i}_bn1")(out)
out = getattr(self, f"residual3_{i}_relu1")(out)
# out = getattr(self, f"residual3_{i}_conv2")(out)
out = self.execute_hyper_conv2d(out, getattr(self, f"residual3_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1)
out = getattr(self, f"residual3_{i}_bn2")(out)
out = getattr(self, f"residual3_{i}_relu2")(out)
x = out + residual
# Fourth layer
if self.scn_list[4] == 0:
x = self.ds_conv4(x)
x = self.ds_bn4(x)
x = self.ds_relu4(x)
for i in range(8):
residual = x
out = getattr(self, f"residual4_{i}_conv1")(x)
out = getattr(self, f"residual4_{i}_bn1")(out)
out = getattr(self, f"residual4_{i}_relu1")(out)
out = getattr(self, f"residual4_{i}_conv2")(out)
out = getattr(self, f"residual4_{i}_bn2")(out)
out = getattr(self, f"residual4_{i}_relu2")(out)
x = out + residual
else:
# x = self.ds_conv4(x)
x = self.execute_hyper_conv2d(x, self.ds_conv4_weight_list, hyper_output, stride=2, padding=1)
x = self.ds_bn4(x)
x = self.ds_relu4(x)
for i in range(8):
residual = x
# out = getattr(self, f"residual4_{i}_conv1")(x)
out = self.execute_hyper_conv2d(x, getattr(self, f"residual4_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0)
out = getattr(self, f"residual4_{i}_bn1")(out)
out = getattr(self, f"residual4_{i}_relu1")(out)
# out = getattr(self, f"residual4_{i}_conv2")(out)
out = self.execute_hyper_conv2d(out, getattr(self, f"residual4_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1)
out = getattr(self, f"residual4_{i}_bn2")(out)
out = getattr(self, f"residual4_{i}_relu2")(out)
x = out + residual
# Fifth layer
if self.scn_list[5] == 0:
x = self.ds_conv5(x)
x = self.ds_bn5(x)
x = self.ds_relu5(x)
for i in range(4):
residual = x
out = getattr(self, f"residual5_{i}_conv1")(x)
out = getattr(self, f"residual5_{i}_bn1")(out)
out = getattr(self, f"residual5_{i}_relu1")(out)
out = getattr(self, f"residual5_{i}_conv2")(out)
out = getattr(self, f"residual5_{i}_bn2")(out)
out = getattr(self, f"residual5_{i}_relu2")(out)
x = out + residual
else:
# x = self.ds_conv5(x)
x = self.execute_hyper_conv2d(x, self.ds_conv5_weight_list, hyper_output, stride=2, padding=1)
x = self.ds_bn5(x)
x = self.ds_relu5(x)
for i in range(4):
residual = x
# out = getattr(self, f"residual5_{i}_conv1")(x)
out = self.execute_hyper_conv2d(x, getattr(self, f"residual5_{i}_conv1_weight_list"), hyper_output, stride=1, padding=0)
out = getattr(self, f"residual5_{i}_bn1")(out)
out = getattr(self, f"residual5_{i}_relu1")(out)
# out = getattr(self, f"residual5_{i}_conv2")(out)
out = self.execute_hyper_conv2d(out, getattr(self, f"residual5_{i}_conv2_weight_list"), hyper_output, stride=1, padding=1)
out = getattr(self, f"residual5_{i}_bn2")(out)
out = getattr(self, f"residual5_{i}_relu2")(out)
x = out + residual
if self.scn_list[6] == 0: # set the last meaning of the
# Adjust output layer for training
x = self.avgpool(x)
x = torch.flatten(x, 1) # Flatten the output of avgpool
x = self.fc(x) # Pass through the final fully connected layer
# x = self.smax(x) # Defined in loss function of PyTorch
else:
# Adjust output layer for training
x = self.avgpool(x)
x = torch.flatten(x, 1) # Flatten the output of avgpool
# x = self.fc(x) # Pass through the final fully connected layer
x = self.execute_hyper_linear(x, self.fc_weight_list, self.linear_bias_list, hyper_output) # the real code for using the SCN with other layers
# x = self.smax(x) # Defined in loss function of PyTorch
return x
def darknet53(num_classes):
scn_list = [1, 1, 0, 0, 0, 0, 0] # do it later for making partial scn
model = DarkNet53_SCN2(scn_list ,num_classes=num_classes) # 100 classes for mini-imagenet
print(scn_list)
print(model.scn_list[1])
return model
# # Testing the model
# if __name__ == "__main__":
# inputs = torch.rand((8, 3, 224, 224)).cuda()
# model = darknet53(num_classes=100).cuda().train()
# # outputs = model(inputs, )
# # print(outputs.shape)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment