Skip to content

Instantly share code, notes, and snippets.

@kirtyvedula
Created March 3, 2020 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kirtyvedula/9698a7a728d1484b3bffb0394c1191d7 to your computer and use it in GitHub Desktop.
Save kirtyvedula/9698a7a728d1484b3bffb0394c1191d7 to your computer and use it in GitHub Desktop.
Code for Joint Coding and Modulation in AWGN channels
from math import sqrt
import numpy as np
import torch
import torch.nn as nn
import torch.utils.data as Data
from scipy.io import savemat
# Hyperparameters
k = 4
n_channel = 7
R = k / n_channel
EbN0_dB_train = 3.0
class_num = 2**k # (n=7,k=4) m=16
epochs = 300 # train the training data e times
batch_size = 512
learning_rate = 0.01 # learning rate
# dtype = torch.cuda.FloatTensor
# CUDA for PyTorch
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
class FullyConnectedAutoencoder(nn.Module):
def __init__(self, k, n_channel, EbN0_dB):
self.k = k
self.n_channel = n_channel
self.EbN0_dB = EbN0_dB
super(FullyConnectedAutoencoder, self).__init__()
self.transmitter = nn.Sequential(
nn.Linear(in_features=2 ** k, out_features=2 ** k, bias=True),
nn.ReLU(inplace=True),
nn.Linear(in_features=2 ** k, out_features=n_channel, bias=True))
self.receiver = nn.Sequential(
nn.Linear(in_features=n_channel, out_features=2 ** k, bias=True),
nn.ReLU(inplace=True),
nn.Linear(in_features=2 ** k, out_features=2 ** k, bias=True), )
def forward(self, x):
x = self.transmitter(x)
# Normalization
n = (x.norm(dim=-1)[:, None].view(-1, 1).expand_as(x))
x = sqrt(7) * (x / n)
training_SNR = 10 ** (self.EbN0_dB / 10) # Train at 3 dB
R = k / n_channel
noise = torch.randn(x.size(), device=device) / ((2 * R * training_SNR) ** 0.5)
x += noise
x = self.receiver(x)
# x = x.to(device)
return x
net = FullyConnectedAutoencoder(k, n_channel, EbN0_dB_train)
net = net.to(device)
# Train data
train_set_size = 10**5
train_labels = (torch.rand(train_set_size) * class_num).long()
# train_data = torch.sparse.torch.eye(class_num).index_select(dim=0, index=train_labels)
train_data = torch.eye(class_num).index_select(dim=0, index=train_labels)
traindataset = Data.TensorDataset(train_data, train_labels)
trainloader = Data.DataLoader(dataset=traindataset, batch_size=batch_size, shuffle=True, num_workers=0)
# train_data = train_data.to(device)
# train_labels = train_labels.to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate, weight_decay=1e-5) # optimize all cnn parameters
loss_func = nn.CrossEntropyLoss() # the target label is not one-hotted
loss_vec = []
# TRAINING
for epoch in range(epochs):
for step, (x, y) in enumerate(trainloader): # gives batch data, normalize x when iterate train_loader
x = x.to(device)
y = y.to(device)
# Forward pass
# import pdb; pdb.set_trace()
output = net(x) # output
y = (y.long()).view(-1)
loss = loss_func(output, y) # cross entropy loss
# Backward and optimize
optimizer.zero_grad() # clear gradients for this training step
loss.backward() # backpropagation, compute gradients
optimizer.step() # apply gradients
loss_vec.append(loss.item())
pred_labels = torch.max(output, 1)[1].data.squeeze()
accuracy = sum(pred_labels == y) / float(batch_size)
if step % (10 ** 4) == 0:
print('Epoch: ', epoch, '| train loss: %.4f' % loss.item(), '| train acc: %4f' % accuracy)
def d2b(d, n):
d = np.array(d)
d = np.reshape(d, (1, -1))
power = np.flipud(2 ** np.arange(n))
g = np.zeros((np.shape(d)[1], n))
for i, num in enumerate(d[0]):
g[i] = num * np.ones((1, n))
b = np.floor((g % (2 * power)) / power)
return np.fliplr(b)
# Exporting Dictionaries
bit_dict = d2b(torch.arange(2 ** k), k)
S_encoded_syms = torch.zeros((2 ** k, 7))
input_dict = torch.eye(2 ** k).to(device)
enc_output = net.transmitter(input_dict)
# noinspection PyRedeclaration
S_encoded_syms = (enc_output.cpu()).detach().numpy()
dict1 = {'S_encoded_syms': S_encoded_syms, 'bit_dict': bit_dict.astype(np.int8)}
savemat('ae_mfbank_AWGN_bpsk_energy_constraint.mat', dict1)
print('Generated dictionaries and encoded symbols')
# %% TESTING
test_set_size = 10 ** 6
test_labels = (torch.rand(test_set_size) * class_num).long()
# test_data = torch.sparse.torch.eye(class_num).index_select(dim=0, index=test_labels)
test_data = torch.eye(class_num).index_select(dim=0, index=test_labels)
testdataset = Data.TensorDataset(test_data, test_labels)
testloader = Data.DataLoader(dataset=testdataset, batch_size=test_set_size, shuffle=True, num_workers=0)
torch.save(net, 'models/74AE.ckpt') # Save model checkpoint
# %%
# Initialize outputs
EbNo_test = torch.arange(0, 11.5, 0.5)
test_BLER = torch.zeros((len(EbNo_test), 1))
# %%
net.eval()
for p in range(len(EbNo_test)):
test_SNR = 10 ** (EbNo_test[p] / 10) # Train at 3 dB
R = k / n_channel
test_noise = (torch.randn(test_set_size, n_channel) / ((2 * R * test_SNR) ** 0.5)).to(device)
with torch.no_grad():
for test_data, test_labels in testloader:
test_data = test_data.to(device)
test_labels = test_labels.to(device)
encoded_signal = net.transmitter(test_data)
noisy_signal = encoded_signal + test_noise
decoded_signal = net.receiver(noisy_signal)
pred_labels = torch.max(decoded_signal, 1)[1].data.squeeze()
test_BLER[p] = sum(pred_labels != test_labels) / float(test_labels.size(0))
# noinspection PyStringFormat
print('Eb/N0:', EbNo_test[p].numpy(), '| test BLER: %.4f' % test_BLER[p])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment