Skip to content

Instantly share code, notes, and snippets.

@l-bat
Last active August 1, 2020 07:25
Show Gist options
  • Save l-bat/bb5f79542a4886529832ac653f6de412 to your computer and use it in GitHub Desktop.
Save l-bat/bb5f79542a4886529832ac653f6de412 to your computer and use it in GitHub Desktop.
import numpy as np
import os
import onnx
import torch
import torch.nn as nn
import torch.nn.functional as F
import cv2
# Class for RPN
class RPN(nn.Module):
"Region Proposal Network"
def __init__(self):
super(RPN, self).__init__()
def forward(self, z_f, x_f):
raise NotImplementedError
class DepthwiseXCorr(nn.Module):
"Depthwise Correlation Layer"
def __init__(self, in_channels, hidden, out_channels, kernel_size=3, hidden_kernel_size=5):
super(DepthwiseXCorr, self).__init__()
self.conv_kernel = nn.Sequential(
nn.Conv2d(in_channels, hidden, kernel_size=kernel_size, bias=False),
nn.BatchNorm2d(hidden),
nn.ReLU(inplace=True),
)
self.conv_search = nn.Sequential(
nn.Conv2d(in_channels, hidden, kernel_size=kernel_size, bias=False),
nn.BatchNorm2d(hidden),
nn.ReLU(inplace=True),
)
self.head = nn.Sequential(
nn.Conv2d(hidden, hidden, kernel_size=1, bias=False),
nn.BatchNorm2d(hidden),
nn.ReLU(inplace=True),
nn.Conv2d(hidden, out_channels, kernel_size=1)
)
def forward(self, kernel, search):
kernel = self.conv_kernel(kernel)
search = self.conv_search(search)
feature = xcorr_depthwise(search, kernel)
out = self.head(feature)
return out
class DepthwiseRPN(RPN):
def __init__(self, anchor_num=5, in_channels=256, out_channels=256):
super(DepthwiseRPN, self).__init__()
self.cls = DepthwiseXCorr(in_channels, out_channels, 2 * anchor_num)
self.loc = DepthwiseXCorr(in_channels, out_channels, 4 * anchor_num)
def forward(self, z_f, x_f):
cls = self.cls(z_f, x_f)
loc = self.loc(z_f, x_f)
return cls, loc
class MultiRPN(RPN):
def __init__(self, anchor_num, in_channels, weighted=False):
super(MultiRPN, self).__init__()
for i in range(len(in_channels)):
self.add_module('rpn'+str(i+2),
DepthwiseRPN(anchor_num, in_channels[i], in_channels[i]))
def forward(self, z_fs, x_fs):
cls = []
loc = []
rpn2 = self.rpn2
z_f2 = z_fs[0]
x_f2 = x_fs[0]
c2,l2 = rpn2(z_f2, x_f2)
cls.append(c2)
loc.append(l2)
rpn3 = self.rpn3
z_f3 = z_fs[1]
x_f3 = x_fs[1]
c3,l3 = rpn3(z_f3, x_f3)
cls.append(c3)
loc.append(l3)
rpn4 = self.rpn4
z_f4 = z_fs[2]
x_f4 = x_fs[2]
c4,l4 = rpn4(z_f4, x_f4)
cls.append(c4)
loc.append(l4)
def avg(lst):
return sum(lst) / len(lst)
avg_cls = avg(cls)
avg_loc = avg(loc)
return avg_cls, avg_loc
# End of class for RPN
def xcorr_depthwise(x, kernel):
batch = kernel.size(0)
channel = kernel.size(1)
x = x.view(1, batch*channel, x.size(2), x.size(3))
kernel = kernel.view(batch*channel, 1, kernel.size(2), kernel.size(3))
conv = nn.Conv2d(batch*channel, batch*channel, kernel_size=(kernel.size(2), kernel.size(3)), bias=False, groups=batch*channel)
conv.weight = nn.Parameter(kernel)
out = conv(x)
# out = F.conv2d(x, kernel, groups=batch*channel)
out = out.view(batch, channel, out.size(2), out.size(3))
out = out.detach()
return out
rpn_head = MultiRPN(anchor_num=5,in_channels=[256, 256, 256],weighted=False)
rpn_head.eval()
rpn_head.state_dict().keys()
rpn_head_dict = rpn_head.state_dict()
load_path = "siamrpn_r50_l234_dwxcorr.pth"
pretrained_dict_head = torch.load(load_path, map_location=torch.device('cpu') )
pretrained_dict_head = {k: v for k, v in pretrained_dict_head.items() if k in rpn_head_dict}
rpn_head_dict.update(pretrained_dict_head)
rpn_head.load_state_dict(rpn_head_dict)
rpn_head.eval()
# np.save("zf_s", zf_s)
# np.save("xf_s", xf_s)
zf_s = np.load("zf_s.npy")
xf_s = np.load("xf_s.npy")
name = "rpn_head_1.onnx"
torch.onnx.export(rpn_head, (torch.Tensor(zf_s), torch.Tensor(xf_s)), name, export_params=True, opset_version=11,
input_names = ['input_1', 'input_2'], output_names = ['output_1', 'output_2'])
print("zf_s", np.min(zf_s), np.max(zf_s))
print("xf_s", np.min(xf_s), np.max(xf_s))
torch_cls, torch_loc = rpn_head(torch.Tensor(zf_s), torch.Tensor(xf_s))
torch_cls = torch_cls.detach().numpy()
torch_loc = torch_loc.detach().numpy()
np.save("torch_cls_1", torch_cls)
np.save("torch_loc_1", torch_loc)
# Outputs from Imported RPN
rpn_head = cv2.dnn.readNetFromONNX(name)
rpn_head.setInput(zf_s, 'input_1')
rpn_head.setInput(xf_s, 'input_2')
cls = rpn_head.forward('output_1')
loc = rpn_head.forward('output_2')
np.save("ocv_cls_1", cls)
np.save("ocv_loc_1", loc)
print(np.max(abs(torch_cls - cls)))
print(np.max(abs(torch_loc - loc)))
@l-bat
Copy link
Author

l-bat commented Aug 1, 2020

I would recommend using random input.

zf_s = np.random.rand(*zf_s.shape)
xf_s = np.random.rand(*xf_s.shape)

Run this script twice and save:
first run:
torch_cls_1, torch_loc_1, ocv_cls_1, ocv_loc_1 and rpn_head_1.onnx
second run:
torch_cls_2, torch_loc_2, ocv_cls_2, ocv_loc_2 and rpn_head_2.onnx

Compare torch_cls_1 with torch_cls_2 and torch_loc_1 with torch_loc_2.
Compare ocv_cls_1 with ocv_cls_2 and ocv_loc_1 with ocv_loc_2.

@jinyup100
Copy link

Okay I understand your suggestion. I will let you know how this goes in my PR thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment