Last active
September 14, 2021 01:40
-
-
Save iphysresearch/a00009c1eede565090dbd29b18ae982c to your computer and use it in GitHub Desktop.
Chapter 6 - MFCNN
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mxnet as mx | |
from mxnet import nd, gluon | |
from mxnet.gluon.nn import Dense, ELU, LeakyReLU, LayerNorm, Conv2D, MaxPool2D, Flatten, Activation, Dropout | |
import os, sys, datetime | |
from loguru import logger | |
#### REF #### https://loguru.readthedocs.io/en/stable/api/logger.html | |
# DEBUG 10 # INFO 20 # WARNING 30 # ERROR 40 # CRITICAL 50 | |
config = { | |
"handlers": [ | |
{"sink": "Logs/MFCNN_{}.log".format(datetime.date.today()), "level":"DEBUG" ,"format": '<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <level>{message}</level>'}, | |
# {"sink": "Solver_cnn.log",}, | |
{"sink": sys.stdout, "format": '<green>{time:YYYY-MM-DD}</green> <cyan>{time:HH:mm:ss}</cyan> | <level>{level: <7}</level> | <level>{message}</level>', | |
"level": "INFO"}, | |
], | |
# "extra": {"user": "someone"} | |
} | |
logger.debug("(Networks) Logs/MFCNN_{}.log".format(datetime.date.today())) | |
logger.configure(**config) | |
def MFCNN(fs, T, C, ctx, template_block, margin, learning_rate=0.003): | |
logger.success('Loading MFCNN network!') | |
net = gluon.nn.Sequential() | |
with net.name_scope(): # Used to disambiguate saving and loading net parameters | |
net.add(MatchedFilteringLayer(mod=fs*T, fs=fs, | |
template_H1=template_block[:,:1],#.as_in_context(ctx), | |
template_L1=template_block[:,-1:]#.as_in_context(ctx) | |
)) | |
net.add(CutHybridLayer(margin = margin)) | |
net.add(Conv2D(channels=16, kernel_size=(1, 3), activation='relu')) | |
net.add(MaxPool2D(pool_size=(1, 4), strides=2)) | |
net.add(Conv2D(channels=32, kernel_size=(1, 3), activation='relu')) | |
net.add(MaxPool2D(pool_size=(1, 4), strides=2)) | |
net.add(Flatten()) | |
net.add(Dense(32)) | |
net.add(Activation('relu')) | |
net.add(Dense(2)) | |
net.initialize(mx.init.Xavier(magnitude=2.24), ctx=ctx[-1], force_reinit=True) # Initialize parameters of all layers | |
net.summary(nd.random.randn(1,2,2,1,fs*T, ctx=ctx[-1])) | |
net.initialize(mx.init.Xavier(magnitude=2.24), ctx=ctx, force_reinit=True) # Initialize parameters of all layers | |
# 交叉熵损失函数 | |
# loss = gloss.SoftmaxCrossEntropyLoss() | |
# The cross-entropy loss for binary classification. | |
bloss = gluon.loss.SigmoidBinaryCrossEntropyLoss() | |
trainer = gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': learning_rate}) | |
return net, bloss, trainer | |
class MatchedFilteringLayer(gluon.HybridBlock): | |
def __init__(self, mod, fs, | |
template_H1, | |
template_L1, | |
differentiable = False): | |
super(MatchedFilteringLayer, self).__init__() | |
self.mod = int(mod) | |
self.fs = int(fs) | |
with self.name_scope(): | |
self.template_H1 = self.params.get('template_H1', | |
shape=template_H1.shape, | |
init=mx.init.Constant(template_H1.asnumpy().tolist()), # Convert to regular list to make this object serializable | |
differentiable=differentiable) | |
self.template_L1 = self.params.get('template_L1', | |
shape=template_L1.shape, | |
init=mx.init.Constant(template_L1.asnumpy().tolist()), # Convert to regular list to make this object serializable | |
differentiable=differentiable) | |
self.num_filter_template = self.template_H1.shape[0] | |
self.kernel_size = self.template_H1.shape[-1] | |
## Global fs/ctx | |
def get_module(self, F, data, mod): | |
ctx = data.context | |
return F.concatenate([data, F.zeros(data.shape[:-1]+(mod - data.shape[-1]%mod, ), ctx=ctx)], axis=len(data.shape)-1).reshape(0,0,-1,mod).sum(axis=-2).expand_dims(2)[:,:,:,::-1] | |
# something wrong here for pad?? | |
# data = F.reshape(F.pad(data, mode="constant", constant_value=0, pad_width=(0,0, 0,0, 0,0, 0,1)), shape=(0,0,-1,mod)) | |
# return F.reverse(F.expand_dims(F.sum(data, axis=-2), 2), axis=3) | |
def hybrid_forward(self, F, data, template_H1, template_L1): | |
# data (nsmaple, 2, C, 1, T*fs) gpu nd.array | |
data_H1, data_L1 = F.split(data = data, axis=2, num_outputs=2) | |
data_H1 = data_H1[:,:,0] # (nsample, 2, 1, T*fs) | |
data_L1 = data_L1[:,:,0] | |
MF_H1 = self.onedetector_forward(F, data_H1, template_H1) | |
MF_L1 = self.onedetector_forward(F, data_L1, template_L1) | |
# (nsample, num_filter_template, 1, T*fs) | |
return nd.concat(MF_H1.expand_dims(0), MF_L1.expand_dims(0), dim=0) | |
def onedetector_forward(self, F, data, template): | |
# Note: Not working for hybrid blocks/mx.symbol! | |
# (8, 1, 1, T*fs), (8, 1, 1, T*fs) <= (8, 2, 1, T*fs) | |
data_block_nd, ts_block_nd = F.split(data = data, axis=1, num_outputs=2) | |
# assert F.shape_array(data).size_array().asscalar() == 4 # (8, 1, 1, T*fs) | |
# assert F.shape_array(self.weight).size_array().asscalar() == 4 | |
batch_size = F.slice_axis(F.shape_array(ts_block_nd), axis=0, begin=0, end=1).asscalar() # 8 | |
# Whiten data =========================================================== | |
data_whiten = F.concatenate( [F.Convolution(data=data_block_nd[i:i+1], # (8, 1, 1, T*fs) | |
weight=ts_block_nd[i:i+1], # (8, 1, 1, T*fs) | |
no_bias=True, | |
kernel=(1, self.mod), | |
stride=(1,1), | |
num_filter=1, | |
pad=(0,self.mod -1),) for i in range(batch_size) ], | |
axis=0) | |
data_whiten = self.get_module(F, data_whiten, self.mod) # (8, 1, 1, T*fs) | |
# Whiten template ======================================================= | |
template_whiten = F.Convolution(data=template, # (8, 1, 1, T*fs) | |
weight=ts_block_nd, # (8, 1, 1, T*fs) | |
no_bias=True, | |
kernel=(1, self.mod), | |
stride=(1,1), | |
num_filter=batch_size, | |
pad=(0,self.mod -1),) | |
template_whiten = self.get_module(F, template_whiten, self.kernel_size) | |
# template_whiten (8, 8, 1, T*fs) | |
# == Calculate the matched filter output in the time domain: ============ | |
optimal = F.concatenate([ F.Convolution(data=data_whiten[i:i+1], # (8, 8, 1, T*fs) | |
weight=template_whiten[:,i:i+1], # (8, 8, 1, T*fs) | |
no_bias=True, | |
kernel=(1, self.kernel_size), | |
stride=(1,1), | |
num_filter=self.num_filter_template, | |
pad=(0, self.kernel_size -1),) for i in range(batch_size)], | |
axis=0) | |
optimal = self.get_module(F, optimal, self.mod) | |
optimal_time = F.abs(optimal*2/self.fs) | |
# optimal_time (8, 8, 1, T*fs) | |
# == Normalize the matched filter output: =============================== | |
sigmasq = F.concatenate([ F.Convolution(data=template_whiten.swapaxes(0,1)[j:j+1:,i:i+1], # (8, 8, 1, T*fs) | |
weight=template_whiten.swapaxes(0,1)[j:j+1:,i:i+1], # (8, 8, 1, T*fs) | |
no_bias=True, | |
kernel=(1, self.kernel_size), | |
stride=(1,1), | |
num_filter=1, | |
pad=(0, self.kernel_size -1),) for j in range(batch_size) for i in range(self.num_filter_template) ], | |
axis=0) | |
sigmasq = self.get_module(F, sigmasq, self.kernel_size)[:,:,:,0].reshape(optimal_time.shape[:2]) | |
sigma = F.sqrt(F.abs( sigmasq/self.fs )).expand_dims(2).expand_dims(2) | |
# sigma (8, 8, 1, 1) | |
return F.broadcast_div(optimal_time, sigma) # (8, 8, 1, T*fs) SNR_MF | |
class CutHybridLayer(gluon.HybridBlock): | |
def __init__(self, margin): | |
super(CutHybridLayer, self).__init__() | |
extra_range = 0.0 | |
self.around_range = (1-margin*2)/2 | |
def hybrid_forward(self, F, x): | |
# (C, nsample, num_filter_template, 1, T*fs) | |
return F.max(x, axis=-1).swapaxes(1,0).swapaxes(3,2) | |
if __name__ == '__main__': | |
print('作为主程序运行') | |
else: | |
pass | |
How to generate the template block and use this layers with our own simulated data. Will it work with simulated data , that could be mixed with some noises ?
For the template block, It depends on your target waveform distribution that can be simulated for your own cases. More details in https://doi.org/10.1103/physrevd.101.104003
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How to generate the template block and use this layers with our own simulated data. Will it work with simulated data , that could be mixed with some noises ?