Skip to content

Instantly share code, notes, and snippets.

@iphysresearch
Last active September 14, 2021 01:40
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save iphysresearch/a00009c1eede565090dbd29b18ae982c to your computer and use it in GitHub Desktop.
Save iphysresearch/a00009c1eede565090dbd29b18ae982c to your computer and use it in GitHub Desktop.
Chapter 6 - MFCNN
import mxnet as mx
from mxnet import nd, gluon
from mxnet.gluon.nn import Dense, ELU, LeakyReLU, LayerNorm, Conv2D, MaxPool2D, Flatten, Activation, Dropout
import os, sys, datetime
from loguru import logger
#### REF #### https://loguru.readthedocs.io/en/stable/api/logger.html
# DEBUG 10 # INFO 20 # WARNING 30 # ERROR 40 # CRITICAL 50
config = {
"handlers": [
{"sink": "Logs/MFCNN_{}.log".format(datetime.date.today()), "level":"DEBUG" ,"format": '<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level}</level> | <level>{message}</level>'},
# {"sink": "Solver_cnn.log",},
{"sink": sys.stdout, "format": '<green>{time:YYYY-MM-DD}</green> <cyan>{time:HH:mm:ss}</cyan> | <level>{level: <7}</level> | <level>{message}</level>',
"level": "INFO"},
],
# "extra": {"user": "someone"}
}
logger.debug("(Networks) Logs/MFCNN_{}.log".format(datetime.date.today()))
logger.configure(**config)
def MFCNN(fs, T, C, ctx, template_block, margin, learning_rate=0.003):
logger.success('Loading MFCNN network!')
net = gluon.nn.Sequential()
with net.name_scope(): # Used to disambiguate saving and loading net parameters
net.add(MatchedFilteringLayer(mod=fs*T, fs=fs,
template_H1=template_block[:,:1],#.as_in_context(ctx),
template_L1=template_block[:,-1:]#.as_in_context(ctx)
))
net.add(CutHybridLayer(margin = margin))
net.add(Conv2D(channels=16, kernel_size=(1, 3), activation='relu'))
net.add(MaxPool2D(pool_size=(1, 4), strides=2))
net.add(Conv2D(channels=32, kernel_size=(1, 3), activation='relu'))
net.add(MaxPool2D(pool_size=(1, 4), strides=2))
net.add(Flatten())
net.add(Dense(32))
net.add(Activation('relu'))
net.add(Dense(2))
net.initialize(mx.init.Xavier(magnitude=2.24), ctx=ctx[-1], force_reinit=True) # Initialize parameters of all layers
net.summary(nd.random.randn(1,2,2,1,fs*T, ctx=ctx[-1]))
net.initialize(mx.init.Xavier(magnitude=2.24), ctx=ctx, force_reinit=True) # Initialize parameters of all layers
# 交叉熵损失函数
# loss = gloss.SoftmaxCrossEntropyLoss()
# The cross-entropy loss for binary classification.
bloss = gluon.loss.SigmoidBinaryCrossEntropyLoss()
trainer = gluon.Trainer(net.collect_params(), 'adam', {'learning_rate': learning_rate})
return net, bloss, trainer
class MatchedFilteringLayer(gluon.HybridBlock):
def __init__(self, mod, fs,
template_H1,
template_L1,
differentiable = False):
super(MatchedFilteringLayer, self).__init__()
self.mod = int(mod)
self.fs = int(fs)
with self.name_scope():
self.template_H1 = self.params.get('template_H1',
shape=template_H1.shape,
init=mx.init.Constant(template_H1.asnumpy().tolist()), # Convert to regular list to make this object serializable
differentiable=differentiable)
self.template_L1 = self.params.get('template_L1',
shape=template_L1.shape,
init=mx.init.Constant(template_L1.asnumpy().tolist()), # Convert to regular list to make this object serializable
differentiable=differentiable)
self.num_filter_template = self.template_H1.shape[0]
self.kernel_size = self.template_H1.shape[-1]
## Global fs/ctx
def get_module(self, F, data, mod):
ctx = data.context
return F.concatenate([data, F.zeros(data.shape[:-1]+(mod - data.shape[-1]%mod, ), ctx=ctx)], axis=len(data.shape)-1).reshape(0,0,-1,mod).sum(axis=-2).expand_dims(2)[:,:,:,::-1]
# something wrong here for pad??
# data = F.reshape(F.pad(data, mode="constant", constant_value=0, pad_width=(0,0, 0,0, 0,0, 0,1)), shape=(0,0,-1,mod))
# return F.reverse(F.expand_dims(F.sum(data, axis=-2), 2), axis=3)
def hybrid_forward(self, F, data, template_H1, template_L1):
# data (nsmaple, 2, C, 1, T*fs) gpu nd.array
data_H1, data_L1 = F.split(data = data, axis=2, num_outputs=2)
data_H1 = data_H1[:,:,0] # (nsample, 2, 1, T*fs)
data_L1 = data_L1[:,:,0]
MF_H1 = self.onedetector_forward(F, data_H1, template_H1)
MF_L1 = self.onedetector_forward(F, data_L1, template_L1)
# (nsample, num_filter_template, 1, T*fs)
return nd.concat(MF_H1.expand_dims(0), MF_L1.expand_dims(0), dim=0)
def onedetector_forward(self, F, data, template):
# Note: Not working for hybrid blocks/mx.symbol!
# (8, 1, 1, T*fs), (8, 1, 1, T*fs) <= (8, 2, 1, T*fs)
data_block_nd, ts_block_nd = F.split(data = data, axis=1, num_outputs=2)
# assert F.shape_array(data).size_array().asscalar() == 4 # (8, 1, 1, T*fs)
# assert F.shape_array(self.weight).size_array().asscalar() == 4
batch_size = F.slice_axis(F.shape_array(ts_block_nd), axis=0, begin=0, end=1).asscalar() # 8
# Whiten data ===========================================================
data_whiten = F.concatenate( [F.Convolution(data=data_block_nd[i:i+1], # (8, 1, 1, T*fs)
weight=ts_block_nd[i:i+1], # (8, 1, 1, T*fs)
no_bias=True,
kernel=(1, self.mod),
stride=(1,1),
num_filter=1,
pad=(0,self.mod -1),) for i in range(batch_size) ],
axis=0)
data_whiten = self.get_module(F, data_whiten, self.mod) # (8, 1, 1, T*fs)
# Whiten template =======================================================
template_whiten = F.Convolution(data=template, # (8, 1, 1, T*fs)
weight=ts_block_nd, # (8, 1, 1, T*fs)
no_bias=True,
kernel=(1, self.mod),
stride=(1,1),
num_filter=batch_size,
pad=(0,self.mod -1),)
template_whiten = self.get_module(F, template_whiten, self.kernel_size)
# template_whiten (8, 8, 1, T*fs)
# == Calculate the matched filter output in the time domain: ============
optimal = F.concatenate([ F.Convolution(data=data_whiten[i:i+1], # (8, 8, 1, T*fs)
weight=template_whiten[:,i:i+1], # (8, 8, 1, T*fs)
no_bias=True,
kernel=(1, self.kernel_size),
stride=(1,1),
num_filter=self.num_filter_template,
pad=(0, self.kernel_size -1),) for i in range(batch_size)],
axis=0)
optimal = self.get_module(F, optimal, self.mod)
optimal_time = F.abs(optimal*2/self.fs)
# optimal_time (8, 8, 1, T*fs)
# == Normalize the matched filter output: ===============================
sigmasq = F.concatenate([ F.Convolution(data=template_whiten.swapaxes(0,1)[j:j+1:,i:i+1], # (8, 8, 1, T*fs)
weight=template_whiten.swapaxes(0,1)[j:j+1:,i:i+1], # (8, 8, 1, T*fs)
no_bias=True,
kernel=(1, self.kernel_size),
stride=(1,1),
num_filter=1,
pad=(0, self.kernel_size -1),) for j in range(batch_size) for i in range(self.num_filter_template) ],
axis=0)
sigmasq = self.get_module(F, sigmasq, self.kernel_size)[:,:,:,0].reshape(optimal_time.shape[:2])
sigma = F.sqrt(F.abs( sigmasq/self.fs )).expand_dims(2).expand_dims(2)
# sigma (8, 8, 1, 1)
return F.broadcast_div(optimal_time, sigma) # (8, 8, 1, T*fs) SNR_MF
class CutHybridLayer(gluon.HybridBlock):
def __init__(self, margin):
super(CutHybridLayer, self).__init__()
extra_range = 0.0
self.around_range = (1-margin*2)/2
def hybrid_forward(self, F, x):
# (C, nsample, num_filter_template, 1, T*fs)
return F.max(x, axis=-1).swapaxes(1,0).swapaxes(3,2)
if __name__ == '__main__':
print('作为主程序运行')
else:
pass
@jaideep11061982
Copy link

How to generate the template block and use this layers with our own simulated data. Will it work with simulated data , that could be mixed with some noises ?

@iphysresearch
Copy link
Author

How to generate the template block and use this layers with our own simulated data. Will it work with simulated data , that could be mixed with some noises ?

For the template block, It depends on your target waveform distribution that can be simulated for your own cases. More details in https://doi.org/10.1103/physrevd.101.104003

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment