Skip to content

Instantly share code, notes, and snippets.

@kirtyvedula
Created March 4, 2020 15:06
Show Gist options
  • Save kirtyvedula/5a936812b33f3cefa2b6f08d9e948739 to your computer and use it in GitHub Desktop.
Save kirtyvedula/5a936812b33f3cefa2b6f08d9e948739 to your computer and use it in GitHub Desktop.
Hamming (7,4) on Impulsive Noise Channels
# -*- coding: utf-8 -*-
"""
Created on Fri Feb 28 08:38:39 2020
@author: kpvedula
"""
# -*- coding: utf-8 -*-
# Import libraries
import numpy as np
import tensorflow as tf
from keras.layers import Input, Dense, GaussianNoise, Lambda, Add, BatchNormalization, Dropout, LeakyReLU
from keras.models import Model
from keras.optimizers import Adam
from keras import backend as K
from keras import regularizers
import matplotlib.pyplot as plt
from scipy.io import savemat, loadmat
import os, time
# import hdf5storage as h5
from keras.callbacks import ModelCheckpoint
import pickle
from keras.models import load_model
from keras.callbacks import *
import warnings
from utils import CyclicLR, ModelCheckpointEnhanced
from datetime import datetime
from keras.callbacks import TensorBoard
# Configure GPU
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
sess = tf.Session(config=tf.ConfigProto())
K.set_session(sess)
def create_one_hot_encoded_data(N, M):
label = np.random.randint(M,size=N)
data = []
for i in label:
temp = np.zeros(M)
temp[i] = 1
data.append(temp)
data = np.array(data)
return label, data
def d2b(d, n):
d = np.array(d)
d = np.reshape(d, (1, -1))
power = np.flipud(2**np.arange(n))
g = np.zeros((np.shape(d)[1], n))
for i, num in enumerate(d[0]):
g[i] = num * np.ones((1,n))
b = np.floor((g%(2*power))/power)
return np.fliplr(b)
def bernoulli_gaussian(noise_std_1, noise_std_2, N, n_channel, p):
x1 = noise_std_1*np.random.randn(N,n_channel)
x2 = noise_std_2*np.random.randn(N,n_channel)
q = np.random.rand(N,n_channel)
mask_bad_channel = 1*(q < p)
mask_good_channel = 1*(q >= p)
noise = mask_good_channel*x1 + mask_bad_channel*x2
return noise
def keras_autoencoder_hamming(M, n_channel):
input_signal = Input(shape=(M,))
input_noise = Input(shape=(n_channel,))
encoded = Dense(M, activation='relu')(input_signal)
encoded1 = Dense(n_channel, activation='linear')(encoded)
encoded2 = Lambda(lambda x: np.sqrt(n_channel) * K.l2_normalize(x, axis=1))(encoded1) #energy constraint
encoded_noise = Add()([encoded2, input_noise])
decoded = Dense(M, activation='relu')(encoded_noise)
decoded1 = Dense(M, activation='softmax')(decoded)
autoencoder = Model(inputs=[input_signal,input_noise], outputs = decoded1)
autoencoder.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
print (autoencoder.summary())
encoder = Model(input_signal, encoded2)
encoded_input = Input(shape=(n_channel,))
deco = autoencoder.layers[-2](encoded_input)
deco = autoencoder.layers[-1](deco)
decoder = Model(encoded_input, deco)
return autoencoder, encoder, decoder
modulation_scheme = 'bpsk'
timestamp = '_20200227_1117_'
n_channel = 7
k = 4
R = 4/7
M = 2**k
N_train = 10**5
N_val = 10**4
N_test = 10**5
EbN0_dB_1 = 3.0
EbN0_dB_2 = -7.0
EbN0_1 = 10**(EbN0_dB_1/10)
noise_std_1 = 1/np.sqrt(2*R*EbN0_1)
EbN0_2 = 10**(EbN0_dB_2/10)
noise_std_2 = 1/np.sqrt(2*R*EbN0_2)
train_label, train_data = create_one_hot_encoded_data(N_train, M)
val_label, val_data = create_one_hot_encoded_data(N_val, M)
# prob_string = ['0','0point1','0point2','0point3','0point4','0point5','0point6','0point7','0point8','0point9','1']
num_train_settings = 50
num_val_settings = 50
num_test_settings = 11
# p_vec_train = np.random.rand(num_train_settings)
p_vec_train =np.random.uniform(low=0.45, high=0.55, size=(num_train_settings,))
p_vec_val = np.random.uniform(low=0.45, high=0.55, size=(num_val_settings,))
# p_vec_test = np.array([0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1])
train_noise = np.zeros((N_train,n_channel,len(p_vec_train)))
val_noise = np.zeros((N_val,n_channel,len(p_vec_val)))
# Intialize other parameters
bler = np.zeros((len(p_vec_train),1))
# Generate impulsive noise with specified p for train, val and test data
# for i in range(len(p_vec_train)):
# train_noise[:,:,i] = bernoulli_gaussian(noise_std_1, noise_std_2, N_train, n_channel, p_vec_train[i])
# val_noise[:,:,i] = bernoulli_gaussian(noise_std_1, noise_std_2, N_val,n_channel, p_vec_train[i])
clr_fn = lambda x: 0.5*(1+np.sin(x*np.pi/2.))
clr = CyclicLR(base_lr=0.001, max_lr=0.01,
step_size=2000., scale_fn=clr_fn,
scale_mode='cycle')
# Setup autoencoder architecture
autoencoder, encoder, decoder = keras_autoencoder_hamming(M, n_channel)
logdir = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=logdir, write_graph=True, write_images=False)
# Fit for the first random noise
autoencoder.fit([train_data, train_noise[:,:,0]], train_data,
epochs=1, batch_size= num_train_settings, validation_data=([val_data, val_noise[:,:,0]], val_data), verbose = 1)
autoencoder.save_weights('weights/weights_after_1_epochs.h5')
# Train the autoencoder - Keras
for j in range(num_train_settings-1):
train_noise[:,:,j+1] = bernoulli_gaussian(noise_std_1, noise_std_2, N_train, n_channel, p_vec_train[j+1])
val_noise[:,:,j+1] = bernoulli_gaussian(noise_std_1, noise_std_2, N_val,n_channel, p_vec_train[j+1])
autoencoder.load_weights('weights/weights_after_'+str(j+1)+'_epochs.h5')
autoencoder.fit([train_data, train_noise[:,:,j+1]], train_data, epochs=1, batch_size= num_train_settings,
validation_data=([val_data, val_noise[:,:,j+1]], val_data),
callbacks = [tensorboard_callback, clr], verbose = 1)
autoencoder.save_weights('weights/weights_after_'+str(j+2)+'_epochs.h5')
test_label, test_data = create_one_hot_encoded_data(N_test, M)
p_vec_test = np.array([0.4,0.5,0.6])
test_noise = np.zeros((N_test,n_channel,len(p_vec_test)))
# Testing
for i in range(len(p_vec_test)):
test_noise = bernoulli_gaussian(noise_std_1, noise_std_2, N_test,n_channel, p_vec_test[i])
K.clear_session()
encoded_signal = encoder.predict(test_data)
noisy_signal = encoded_signal + test_noise
decoded_signal = decoder.predict(noisy_signal)
decoded_output = np.argmax(decoded_signal,axis=1)
no_errors = (decoded_output != test_label)
no_errors = no_errors.astype(int).sum()
bler[i] = no_errors / N_test
print('Testing with p = ',p_vec_test[i], '| BLER =',bler[i])
K.clear_session()
# Saving
adict = {}
adict['ae_BLER'] = bler
savemat('ae_hamming_AWGN_bler_results_'+timestamp+modulation_scheme+'_EbNodB1_'+str(EbN0_dB_1)+'_EbNodB2_'+str(EbN0_dB_2)+'.mat', adict)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment