Skip to content

Instantly share code, notes, and snippets.

@nbecker
Created November 10, 2023 13:37
Show Gist options
  • Save nbecker/e45512cc2cdce42e7b67a77e8bd1dfcd to your computer and use it in GitHub Desktop.
Save nbecker/e45512cc2cdce42e7b67a77e8bd1dfcd to your computer and use it in GitHub Desktop.
test1
#!/usr/bin/env python
# coding: utf-8
import os
os.environ["PATH"] += os.pathsep + '/usr/local/cuda-12.2/bin'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' #supposed to suppress messages, but doesn't work
#import os
# use xla-lite (doesn't work for centroid for now)
#os.environ['TF_XLA_FLAGS']='--tf_xla_auto_jit=fusible'
def parse_args(args):
import argparse
from argparse_bool import ConfigureAction
parser = argparse.ArgumentParser()
parser.add_argument ('--ibodB', type=float)
parser.add_argument ('--obodB', type=float, default=-1)
parser.add_argument ('--alpha', type=float, default=0.1)
parser.add_argument ('--sps', type=int, default=4)
parser.add_argument ('--si', type=int, default=32)
parser.add_argument ('--rate', type=float, default=0.66666)
parser.add_argument ('--init-const-rate', default='2/3')
parser.add_argument ('--limit', choices=['linear', 'sspa'], default='sspa')
parser.add_argument ('--train-esno-min', type=float)
parser.add_argument ('--train-esno-max', type=float)
parser.add_argument ('--esno-min', type=float)
parser.add_argument ('--esno-max', type=float)
parser.add_argument ('--esno-step', type=float, default=0.125)
parser.add_argument ('--width', type=int, default=3)
parser.add_argument ('--n-layers', type=int, default=4)
parser.add_argument ('--N', type=int, default=256)
parser.add_argument ('--kind', choices=['cnn', 'dense'], default='cnn')
parser.add_argument ('--load')
parser.add_argument ('--min-bler', type=float)
parser.add_argument ('--action', choices=['run-baseline', 'run-centroids', 'train-ml', 'run-ml', 'run-opt-centroids', 'dump-const', 'plot-const', 'dump-orig-const', 'run-pred', 'run-opt-pred', 'run-cancel', 'run-pred-centroids', 'run-opt-pred-centroids', 'plot-spec', 'plot-spec-pred', 'train-ml-pred', 'train-centroids', 'run-centroid-opt-centroids', 'run-opt-baseline', 'run-ml-pred', 'train-and-run-ml'])
parser.add_argument ('--mod', choices=['32apsk', '64apsk'], default='32apsk')
parser.add_argument ('--pdf', action=ConfigureAction)
parser.add_argument ('--normalize', action=ConfigureAction, default=True)
parser.add_argument ('--initial-beta', type=float, default=0.2)
parser.add_argument ('--final-beta', type=float, default=0.05)
parser.add_argument ('--pred-iter', type=int, default=20)
parser.add_argument ('--rcv-iter', type=int, default=4)
parser.add_argument ('--max-iter', type=int, default=10000)
parser.add_argument ('--max-block-errors', type=int, default=1000)
parser.add_argument ('--tensorboard', action=ConfigureAction)
opt = parser.parse_args(args[1:])
if opt.esno_min is None:
opt.esno_min = {'32apsk' : 13.0, '64apsk' : 18.0}[opt.mod]
if opt.esno_max is None:
opt.esno_max = {'32apsk' : 13.5, '64apsk' : 18.5}[opt.mod]
if opt.train_esno_min is None:
opt.train_esno_min = {'32apsk' : 13.0, '64apsk' : 18.0}[opt.mod]
if opt.train_esno_max is None:
opt.train_esno_max = {'32apsk' : 13.5, '64apsk' : 18.5}[opt.mod]
import hashlib
m = hashlib.md5()
for c in args:
m.update (c.encode (sys.getdefaultencoding()))
logname = args[0] + '.' + m.hexdigest()+'.log'
print ('logname:', logname)
return opt, logname
from sionna.utils import QAMSource
from sionna.signal import Upsampling, Downsampling, RootRaisedCosineFilter, empirical_psd, empirical_aclr
# Configure the notebook to use only a single GPU and allocate only as much memory as needed
# For more details, see https://www.tensorflow.org/guide/gpu
import tensorflow as tf
import tensorflow_probability as tfp
tf.get_logger().setLevel('ERROR')
gpus = tf.config.list_physical_devices('GPU')
print('Number of GPUs available :', len(gpus))
if gpus:
gpu_num = 0 # Index of the GPU to use
try:
tf.config.set_visible_devices(gpus[gpu_num], 'GPU')
print('Only GPU number', gpu_num, 'used.')
tf.config.experimental.set_memory_growth(gpus[gpu_num], True)
except RuntimeError as e:
print(e)
# In[2]:
#get_ipython().run_line_magic('matplotlib', 'inline')
import numpy as np
import pickle
from tensorflow.keras import Model
from tensorflow.keras.layers import Layer, Dense, Conv1D
import sys
import sionna
import traceback
import contextlib
# Some helper code to demonstrate the kinds of errors you might encounter.
@contextlib.contextmanager
def assert_raises(error_class):
try:
yield
except error_class as e:
print('Caught expected exception \n {}:'.format(error_class))
traceback.print_exc(limit=2)
except Exception as e:
raise e
else:
raise Exception('Expected {} to be raised but no error was raised!'.format(
error_class))
from sionna.channel import AWGN
from sionna.utils import BinarySource, ebnodb2no, log10, expand_to_rank, insert_dims
from sionna.fec.ldpc.encoding import LDPC5GEncoder
from sionna.fec.ldpc.decoding import LDPC5GDecoder
from sionna.mapping import Mapper, Demapper, Constellation
from sionna.utils import sim_ber
def power (u):
return tf.math.real (tf.norm (u)**2 / tf.cast (tf.size (u), u.dtype))
def normalize (u, start=0, end=-1):
return u / tf.cast (tf.sqrt (power (u[start:end])), u.dtype)
def average_mutual_information (xbits, llr):
return 1.0 - tf.reduce_mean (tf.experimental.numpy.log2 (1 + tf.exp ((1.0-2.0*tf.cast (xbits, tf.float32)) * llr)))
# ## Simulation Parameters <a class="anchor" id="Simulation-Parameters"></a>
# In[3]:
###############################################
# SNR range for evaluation and training [dB]
###############################################
# if opt.limit == 'sspa':
# esno_db_min = 12.0 # obo=-0.5
# esno_db_max = 13.0
# else:
# esno_db_min = 15.5
# esno_db_max = 16.0 #9.5 if we use lower esno will we converge to 8+8 instead of qam?
# ## Neural Demapper <a class="anchor" id="Neural-Demapper"></a>
# The neural network-based demapper shown in the figure above is made of three dense layers with ReLU activation.
#
# The input of the demapper consists of a received sample $y \in \mathbb{C}$ and the noise power spectral density $N_0$ in log-10 scale to handle different orders of magnitude for the SNR.
#
# As the neural network can only process real-valued inputs, these values are fed as a 3-dimensional vector
#
# $$\left[ \mathcal{R}(y), \mathcal{I}(y), \log_{10}(N_0) \right]$$
#
# where $\mathcal{R}(y)$ and $\mathcal{I}(y)$ refer to the real and imaginary component of $y$, respectively.
#
# The output of the neural network-based demapper consists of LLRs on the `num_bits_per_symbol` bits mapped to a constellation point. Therefore, the last layer consists of ``num_bits_per_symbol`` units.
#
# **Note**: The neural network-based demapper processes the received samples $y$ forming a block individually. The [neural receiver notebook](https://nvlabs.github.io/sionna/examples/Neural_Receiver.html) provides an example of a more advanced neural network-based receiver that jointly processes a resource grid of received symbols.
# In[4]:
class NeuralDemapper(Layer):
def __init__(self, N=128, width=3, n_layers=3, kind='cnn'):
super().__init__()
if kind == 'dense':
self.layers = [Dense(N, 'relu') for _ in range (n_layers-1)]
elif kind == 'cnn':
self.layers = [Conv1D(N, width, activation='relu', padding='same') for _ in range (n_layers-1)]
self._dense_3 = Dense(num_bits_per_symbol, None) # The feature correspond to the LLRs for every bits carried by a symbol
def call(self, inputs):
y,no = inputs
# Using log10 scale helps with the performance
no_db = log10(no)
# Stacking the real and imaginary components of the complex received samples
# and the noise variance
no_db = tf.tile(no_db, [1, num_symbols_per_codeword]) # [batch size, num_symbols_per_codeword]
z = tf.stack([tf.math.real(y),
tf.math.imag(y),
no_db], axis=2) # [batch size, num_symbols_per_codeword, 3]
for layer in self.layers:
z = layer (z)
llr = self._dense_3(z) # [batch size, num_symbols_per_codeword, num_bits_per_symbol]
return llr
# ## Trainable End-to-end System: Conventional Training <a class="anchor" id="Trainable-End-to-end-System:-Conventional-Training"></a>
# The following cell defines an end-to-end communication system that transmits bits modulated using a trainable constellation over an AWGN channel.
#
# The receiver uses the previously defined neural network-based demapper to compute LLRs on the transmitted (coded) bits.
#
# As in [1], the constellation and neural network-based demapper are jointly trained through SGD and backpropagation using the binary cross entropy (BCE) as loss function.
#
# Training on the BCE is known to be equivalent to maximizing an achievable information rate [2].
#
# The following model can be instantiated either for training (`training = True`) or evaluation (`training = False`).
#
# In the former case, the BCE is returned and no outer code is used to reduce computational complexity and as it does not impact the training of the constellation or demapper.
#
# When setting `training` to `False`, an LDPC outer code from 5G NR is applied.
# In[5]:
from read_sspa2 import read_sspa, interp
class sspa:
def __init__ (self):
sspa, inp, obo_spline, ibo_am, pin_ref, obo, pout_ref, ibo_phase, phase, phase_ref, ibo_phase2, phase2, phase_spline = read_sspa (return_extra=True)
self.I = interp (obo_spline, phase_spline)
def __call__ (self, x):
return self.I (x)
def setup(opt, logname):
###############################################
# Modulation and coding configuration
###############################################
beta = opt.alpha
span_in_symbols = opt.si # Filter span in symbold
samples_per_symbol = opt.sps # Number of samples per symbol, i.e., the oversampling factor
rrcf = RootRaisedCosineFilter(span_in_symbols, samples_per_symbol, beta, normalize=False)
mf = RootRaisedCosineFilter(span_in_symbols, samples_per_symbol, beta, normalize=False)
mf._coefficients = mf._coefficients / samples_per_symbol
if opt.mod == '32apsk':
from const32apsk import gen_constellation_32apsk
const = (gen_constellation_32apsk (opt.init_const_rate))
bits_per_sym = num_bits_per_symbol = 5
elif opt.mod == '64apsk':
from const64apsk import gen_constellation_64apsk
const = gen_constellation_64apsk (opt.init_const_rate)
bits_per_sym = num_bits_per_symbol = 6
modulation_order = 2**num_bits_per_symbol
coderate = opt.rate
n = N = 8400 # Codeword length [bit]. Must be a multiple of num_bits_per_symbol
num_symbols_per_codeword = n//num_bits_per_symbol # Number of modulated baseband symbols per codeword
k = K = int(n*coderate) # Number of information bits per codeword
###############################################
# Training configuration
###############################################
num_training_iterations_conventional = 5000 # Number of training iterations for conventional training
# Number of training iterations with RL-based training for the alternating training phase and fine-tuning of the receiver phase
num_training_iterations_rl_alt = 7000
num_training_iterations_rl_finetuning = 3000
training_batch_size = tf.constant(128, tf.int32) # Training batch size
rl_perturbation_var = 0.01 # Variance of the perturbation used for RL-based training of the transmitter
model_weights_path_conventional_training = logname + ',conventional.weights'#"awgn_autoencoder_weights_conventional_training" # Filename to save the autoencoder weights once conventional training is done
model_weights_path_rl_training = logname + ',rl.weights'#"awgn_autoencoder_weights_rl_training" # Filename to save the autoencoder weights once RL-based training is done
###############################################
# Evaluation configuration
###############################################
results_filename = logname.replace('.log', '.results')
if opt.limit == 'linear':
def limiter (x):
return x
limiter.name = 'linear'
elif opt.limit == 'sspa':
limiter = sspa ()
limiter.name = 'sspa(j3)'
ind1, ind0 = gen_ind (bits_per_sym)
llr_mask = gen_llr_mask (bits_per_sym)
M = 1<<bits_per_sym
globals().update(locals()) # gross hack
#tf.config.run_functions_eagerly(True)
def gen_ind (bits):
size = 1<<bits
all_syms = np.arange (size)
all_bits = np.arange (bits)
bit_mask = (all_syms[:,np.newaxis] >> all_bits) & 1
ind1 = np.array ([np.nonzero (bit_mask[...,i])[0] for i in range (bits)])
ind0 = np.array ([np.nonzero (np.logical_not (bit_mask[...,i]))[0] for i in range (bits)])
return ind1, ind0
def gen_llr_mask (bits):
size = 1<<bits
all_syms = np.arange (size)
all_bits = np.arange (bits)
bit_mask = (all_syms[:,np.newaxis] >> all_bits) & 1
return bit_mask
#from xtensor_math import mag_sqr
def mag_sqr (u):
return np.real(u)**2 + np.imag(u)**2
import traceback
import warnings
import sys
def warn_with_traceback(message, category, filename, lineno, file=None, line=None):
log = file if hasattr(file,'write') else sys.stderr
traceback.print_stack(file=log)
log.write(warnings.formatwarning(message, category, filename, lineno, line))
warnings.showwarning = warn_with_traceback
# from tensorflow.python.ops.numpy_ops import np_config
# np_config.enable_numpy_behavior()
class demapper ():
def __init__ (self, M, n_symbols):
all_indices = np.array([[[[b,i,j] for j in range (M)] for i in range (n_symbols)] for b in range (training_batch_size.numpy())] )
self.indices_0 = all_indices[:,:,ind0]
self.indices_1 = all_indices[:,:,ind1]
#@tf.function(jit_compile=True)
def __call__ (self, ll):
#inputs[batches, N]
# Unforturnately this direct addressing doesn't work in graph mode
# ld0 = ll[:,:,ind0]
# ld1 = ll[:,:,ind1]
# Instead we have to enumerate all the indices and use gather_nd
ld0 = tf.gather_nd (ll, self.indices_0)
ld1 = tf.gather_nd (ll, self.indices_1)
llr = tf.math.reduce_logsumexp (ld0, axis=-1) - tf.math.reduce_logsumexp (ld1, axis=-1)
# llr has shape [batch, n_symbols, bits_per_sym]
llr_flat = tf.reshape (llr[:,:,::-1], (llr.shape[0],N))
return -llr_flat # wants llrs inverted
#dem = demapper (M, num_symbols_per_codeword)
def measure_obo (constellation, ibodB, xconst=None):
'given ibodB(negative) find power(dB)'
ibo = 10 ** (0.05 * ibodB)
batch_size = 128
if xconst is None:
binary_source = BinarySource()
mapper = Mapper(constellation=constellation)
c = binary_source([batch_size, n])
x = mapper(c) # x [batch size, num_symbols_per_codeword]
else:
x = xconst
us = Upsampling(samples_per_symbol)
x_us = us(x)
xfilt_out = rrcf(x_us)
limit_out = limiter (xfilt_out * tf.cast (ibo, xfilt_out.dtype))
# mf_out = mf (limit_out)
# ds = Downsampling(samples_per_symbol, rrcf.length-1)
# x_hat = ds(mf_out)[:,:x.shape[1]]
p = power (limit_out[:,opt.sps*opt.si//2:opt.sps*opt.si//2+num_symbols_per_codeword*opt.sps])
return 10*log10(p)
def find_ibodB (constellation, obodB, xconst=None): # obodB specified as negative amount
#from scipy.optimize import root_scalar
#sol = root_scalar (lambda ibodB: measure_obo (constellation, ibodB, xconst) - obodB, method='brentq', bracket=[-20, +6])
#return sol.root
res = tfp.math.find_root_chandrupatla(
lambda ibodB: measure_obo (constellation, ibodB, xconst) - obodB,
low=-20,
high=+6,)
return res.estimated_root
# def set_ibo (constellation, verbose=True):
# 'find ibo(dB) for opt.obodB'
# if opt.obodB is not None:
# ibo = find_ibo (constellation, opt.obodB)
# if verbose:
# print ('ibo(dB):', ibo)
# return ibo
class E2ESystemConventionalTraining(Model):
def __init__(self, training, constellation=None, demapper_type='neural', N=128, n_layers=3, train_mapper=True, demapper=None, z=None, width=3, kind='cnn', pred=False):
super().__init__()
self._training = training
self.pred = pred
################
## Transmitter
################
self._binary_source = BinarySource()
# To reduce the computational complexity of training, the outer code is not used when training,
# as it is not required
if not self._training:
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol)
if z is not None:
symlength = n // num_bits_per_symbol
encout = np.arange (n)
in_bits = np.empty (n, dtype=np.int64)
for i in range (num_bits_per_symbol):
in_bits[i::num_bits_per_symbol] = encout[z[i]*symlength:(z[i]+1)*symlength]
self._encoder._out_int = in_bits
self._encoder._out_int_inv = np.argsort (in_bits)
# Trainable constellation
if constellation is None:
constellation = Constellation("qam", num_bits_per_symbol, trainable=True)
else:
constellation = Constellation("custom", num_bits_per_symbol, initial_value=constellation, trainable=train_mapper)
self.constellation = constellation
self._mapper = Mapper(constellation=constellation)
################
## Channel
################
self._channel = AWGN()
################
## Receiver
################
# We use the previously defined neural network for demapping
if demapper is not None:
self._demapper = demapper
elif demapper_type == 'neural':
self._demapper = NeuralDemapper(N=N, n_layers=n_layers, width=width, kind=kind)
elif demapper_type == 'app':
self._demapper = Demapper (demapping_method='app', num_bits_per_symbol=num_bits_per_symbol, constellation=constellation)
# To reduce the computational complexity of training, the outer code is not used when training,
# as it is not required
if not self._training:
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True)
#################
# Loss function
#################
if self._training:
self._bce = tf.keras.losses.BinaryCrossentropy(from_logits=True)
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (self._mapper.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
@tf.function()#jit_compile=True)
def call(self, batch_size, esno_db=None, ebno_db=None, return_limit_out=False):
# kludge: sim_ber will call with ebno_db, but it's really esno we're using
if esno_db == None:
esno_db = ebno_db
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers
if len(esno_db.shape) == 0:
esno_db = tf.fill([batch_size], esno_db)
no = 10**(-0.1 * esno_db)
no = expand_to_rank(no, 2)
no = tf.cast (no, tf.float32)
################
## Transmitter
################
# Outer coding is only performed if not training
if self._training:
c = self._binary_source([batch_size, n])
else:
b = self._binary_source([batch_size, k])
c = self._encoder(b)
# Modulation
x = self._mapper(c) # x [batch size, num_symbols_per_codeword]
if self.pred:
x_pred, mse = pred (x, 1.0, betas, 20*log10(limiter.ibo), limiter, samples_per_symbol, span_in_symbols, rrcf, mf, norm=opt.normalize)
else:
x_pred = x
us = Upsampling(samples_per_symbol)
# Upsample the QAM symbol sequence
x_us = us(x_pred)
xfilt_out = rrcf(x_us)
################
## Channel
################
if self.pred:
# Find a new ibo with the predistorted signal to give the proper obo
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x_pred)), tf.complex64)
else:
ibo = limiter.ibo
limit_out = limiter (xfilt_out * ibo)
if return_limit_out:
return limit_out
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword]
mf_out = mf (y)
# Instantiate a downsampling layer
num_symbols = x.shape[1]
#ds = Downsampling(samples_per_symbol, span_in_symbols-1)
ds = Downsampling(samples_per_symbol, rrcf.length-1, num_symbols)
# Recover the transmitted symbol sequence
x_hat = ds(mf_out)[:,:x.shape[1]]
self.x_hat = x_hat
#print ('mses:', np.array([tf.math.real(power (a - b)).numpy() for a,b in zip (x, x_hat)]))
xconst_sqnorm = power (x)
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype)/tf.cast(xconst_sqnorm, tf.complex64)
y = x_hat / signal
################
## Receiver
################
llr = self._demapper([y, no])
llr = tf.reshape(llr, [batch_size, n])
# If training, outer decoding is not performed and the BCE is returned
if self._training:
loss = self._bce(c, llr)
return loss
else:
# Outer decoding
b_hat = self._decoder(llr)
#print ('bce:', tf.keras.losses.BinaryCrossentropy(from_logits=True)(c, llr), 'ami:', average_mutual_information (c, llr))
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation
# A simple training loop is defined in the next cell, which performs `num_training_iterations_conventional` training iterations of SGD. Training is done over a range of SNR, by randomly sampling a batch of SNR values at each iteration.
# **Note:** For an introduction to the implementation of differentiable communication systems and their optimization through SGD and backpropagation with Sionna, please refer to [the Part 2 of the Sionna tutorial for Beginners](https://nvlabs.github.io/sionna/examples/Sionna_tutorial_part2.html).
# In[6]:
from datetime import datetime
logdir = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()
def conventional_training(model):
# Optimizer used to apply gradients
optimizer = tf.keras.optimizers.Adam()
for i in range(num_training_iterations_conventional):
# Sampling a batch of SNRs
esno_db = tf.random.uniform(shape=[training_batch_size], minval=opt.train_esno_min, maxval=opt.train_esno_max)
# Forward pass
with tf.GradientTape() as tape:
loss = model(training_batch_size, esno_db) # The model is assumed to return the BMD rate
# Computing and applying gradients
weights = model.trainable_weights
grads = tape.gradient(loss, weights)
nonfinite_grads = [not tf.math.reduce_all (tf.math.is_finite (grad)) for grad in grads]
zero_grads = [tf.zeros_like (grad) for grad in grads]
grads = [zero_grad if nonfinite else grad for nonfinite,zero_grad,grad in zip (nonfinite_grads, zero_grads, grads)]
optimizer.apply_gradients(zip(grads, weights))
tf.summary.scalar('BCE', data=loss.numpy(), step=i)
# Printing periodically the progress
if i % 100 == 0:
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model._mapper.constellation, opt.obodB)), tf.complex64)
print('Iteration {}/{} BCE: {:.4f}'.format(i, num_training_iterations_conventional, loss.numpy()), 'ibo(dB):', 20*log10(limiter.ibo).numpy(), 'obo:', measure_obo (model._mapper.constellation, 20*np.log10(limiter.ibo.numpy())).numpy(), 'pwr:', power (model.constellation.points.numpy()).numpy(), end='\r')
# The next cell defines a utility function for saving the weights using [pickle](https://docs.python.org/3/library/pickle.html).
# In[7]:
def save_weights(model, model_weights_path):
weights = model.get_weights()
with open(model_weights_path, 'wb') as f:
pickle.dump(weights, f, -1)
# In the next cell, an instance of the model defined previously is instantiated and trained.
# In[8]:
# Fix the seed for reproducible trainings
tf.random.set_seed(1)
# Instantiate and train the end-to-end system
#model_32apsk_notrain = E2ESystemConventionalTraining(training=True, constellation=const, N=128, n_layers=3, train_mapper=False)
def run_ml_training(model):
conventional_training(model)
# Save weights
save_weights(model, model_weights_path_conventional_training)
return model_weights_path_conventional_training
def train_2_times():
conventional_training (model_32apsk_notrain)
model_32apsk_train = E2ESystemConventionalTraining(training=True, constellation=const, N=128, n_layers=3, train_mapper=True, demapper=model_32apsk_notrain._demapper)
conventional_training (model_32apsk_train)
return model_32apsk_train
# ## Trainable End-to-end System: RL-based Training <a class="anchor" id="Trainable-End-to-end-System:-RL-based-Training"></a>
# The following cell defines the same end-to-end system as before, but stop the gradients after the channel to simulate a non-differentiable channel.
#
# To jointly train the transmitter and receiver over a non-differentiable channel, we follow [3], which key idea is to alternate between:
#
# - Training of the receiver on the BCE using conventional backpropagation and SGD.
# - Training of the transmitter by applying (known) perturbations to the transmitter output to enable estimation of the gradient of the transmitter weights with respect to an approximation of the loss function.
#
# When `training` is set to `True`, both losses for training the receiver and the transmitter are returned.
# In[9]:
# class E2ESystemRLTraining(Model):
# def __init__(self, training):
# super().__init__()
# self._training = training
# ################
# ## Transmitter
# ################
# self._binary_source = BinarySource()
# # To reduce the computational complexity of training, the outer code is not used when training,
# # as it is not required
# if not self._training:
# self._encoder = LDPC5GEncoder(k, n)
# # Trainable constellation
# constellation = Constellation("qam", num_bits_per_symbol, trainable=True)
# self.constellation = constellation
# self._mapper = Mapper(constellation=constellation)
# ################
# ## Channel
# ################
# self._channel = AWGN()
# ################
# ## Receiver
# ################
# # We use the previously defined neural network for demapping
# self._demapper = NeuralDemapper()
# # To reduce the computational complexity of training, the outer code is not used when training,
# # as it is not required
# if not self._training:
# self._decoder = LDPC5GDecoder(self._encoder, hard_out=True)
# @tf.function(jit_compile=True)
# def call(self, batch_size, ebno_db, perturbation_variance=tf.constant(0.0, tf.float32)):
# # If `ebno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers
# if len(ebno_db.shape) == 0:
# ebno_db = tf.fill([batch_size], ebno_db)
# no = ebnodb2no(ebno_db, num_bits_per_symbol, coderate)
# no = expand_to_rank(no, 2)
# ################
# ## Transmitter
# ################
# # Outer coding is only performed if not training
# if self._training:
# c = self._binary_source([batch_size, n])
# else:
# b = self._binary_source([batch_size, k])
# c = self._encoder(b)
# # Modulation
# x = self._mapper(c) # x [batch size, num_symbols_per_codeword]
# # Adding perturbation
# # If ``perturbation_variance`` is 0, then the added perturbation is null
# epsilon_r = tf.random.normal(tf.shape(x))*tf.sqrt(0.5*perturbation_variance)
# epsilon_i = tf.random.normal(tf.shape(x))*tf.sqrt(0.5*perturbation_variance)
# epsilon = tf.complex(epsilon_r, epsilon_i) # [batch size, num_symbols_per_codeword]
# x_p = x + epsilon # [batch size, num_symbols_per_codeword]
# ################
# ## Channel
# ################
# y = self._channel([x_p, no]) # [batch size, num_symbols_per_codeword]
# y = tf.stop_gradient(y) # Stop gradient here
# ################
# ## Receiver
# ################
# llr = self._demapper([y, no])
# # If training, outer decoding is not performed
# if self._training:
# # Average BCE for each baseband symbol and each batch example
# c = tf.reshape(c, [-1, num_symbols_per_codeword, num_bits_per_symbol])
# bce = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(c, llr), axis=2) # Avergare over the bits mapped to a same baseband symbol
# # The RX loss is the usual average BCE
# rx_loss = tf.reduce_mean(bce)
# # From the TX side, the BCE is seen as a feedback from the RX through which backpropagation is not possible
# bce = tf.stop_gradient(bce) # [batch size, num_symbols_per_codeword]
# x_p = tf.stop_gradient(x_p)
# p = x_p-x # [batch size, num_symbols_per_codeword] Gradient is backpropagated through `x`
# tx_loss = tf.square(tf.math.real(p)) + tf.square(tf.math.imag(p)) # [batch size, num_symbols_per_codeword]
# tx_loss = -bce*tx_loss/rl_perturbation_var # [batch size, num_symbols_per_codeword]
# tx_loss = tf.reduce_mean(tx_loss)
# return tx_loss, rx_loss
# else:
# llr = tf.reshape(llr, [-1, n]) # Reshape as expected by the outer decoder
# b_hat = self._decoder(llr)
# return b,b_hat
# The next cell implements the training algorithm from [3], which alternates between conventional training of the neural network-based receiver, and RL-based training of the transmitter.
# In[19]:
# def rl_based_training(model):
# # Optimizers used to apply gradients
# optimizer_tx = tf.keras.optimizers.Adam() # For training the transmitter
# optimizer_rx = tf.keras.optimizers.Adam() # For training the receiver
# # Function that implements one transmitter training iteration using RL.
# def train_tx():
# # Sampling a batch of SNRs
# ebno_db = tf.random.uniform(shape=[training_batch_size], minval=ebno_db_min, maxval=ebno_db_max)
# # Forward pass
# with tf.GradientTape() as tape:
# # Keep only the TX loss
# tx_loss, _ = model(training_batch_size, ebno_db,
# tf.constant(rl_perturbation_var, tf.float32)) # Perturbation are added to enable RL exploration
# ## Computing and applying gradients
# weights = model.trainable_weights
# grads = tape.gradient(tx_loss, weights)
# optimizer_tx.apply_gradients(zip(grads, weights))
# # Function that implements one receiver training iteration
# def train_rx():
# # Sampling a batch of SNRs
# ebno_db = tf.random.uniform(shape=[training_batch_size], minval=ebno_db_min, maxval=ebno_db_max)
# # Forward pass
# with tf.GradientTape() as tape:
# # Keep only the RX loss
# _, rx_loss = model(training_batch_size, ebno_db) # No perturbation is added
# ## Computing and applying gradients
# weights = model.trainable_weights
# grads = tape.gradient(rx_loss, weights)
# optimizer_rx.apply_gradients(zip(grads, weights))
# # The RX loss is returned to print the progress
# return rx_loss
# # Training loop.
# for i in range(num_training_iterations_rl_alt):
# # 10 steps of receiver training are performed to keep it ahead of the transmitter
# # as it is used for computing the losses when training the transmitter
# for _ in range(10):
# rx_loss = train_rx()
# # One step of transmitter training
# train_tx()
# # Printing periodically the progress
# if i % 100 == 0:
# print('Iteration {}/{} BCE {:.4f}'.format(i, num_training_iterations_rl_alt, rx_loss.numpy()), end='\r')
# print() # Line break
# # Once alternating training is done, the receiver is fine-tuned.
# print('Receiver fine-tuning... ')
# for i in range(num_training_iterations_rl_finetuning):
# rx_loss = train_rx()
# if i % 100 == 0:
# print('Iteration {}/{} BCE {:.4f}'.format(i, num_training_iterations_rl_finetuning, rx_loss.numpy()), end='\r')
# In the next cell, an instance of the model defined previously is instantiated and trained.
# In[20]:
# # Fix the seed for reproducible trainings
# tf.random.set_seed(1)
# # Instantiate and train the end-to-end system
# model = E2ESystemRLTraining(training=True)
# rl_based_training(model)
# # Save weights
# save_weights(model, model_weights_path_rl_training)
# ## Evaluation <a class="anchor" id="Evaluation"></a>
# The following cell implements a baseline which uses QAM with Gray labeling and conventional demapping for AWGN channel.
# In[21]:
class Baseline(Model):
def __init__(self, constellation=None):
super().__init__()
################
## Transmitter
################
self._binary_source = BinarySource()
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol)
if constellation is None:
constellation = Constellation("qam", num_bits_per_symbol, trainable=False)
self.constellation = constellation
else:
self.constellation = constellation
self._mapper = Mapper(constellation=constellation)
################
## Channel
################
self._channel = AWGN()
self.us = Upsampling(samples_per_symbol)
self.ds = Downsampling(samples_per_symbol, rrcf.length-1)
################
## Receiver
################
self._demapper = Demapper("app", constellation=constellation)#demapper (constellation.points, M, num_symbols_per_codeword)
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True)
@tf.function()#jit_compile=True)
def call(self, batch_size, esno_db):#, perturbation_variance=tf.constant(0.0, tf.float32)):
#tf.print ('batch:', batch_size, output_stream=sys.stdout)
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers
if len(esno_db.shape) == 0:
esno_db = tf.fill([batch_size], esno_db)
no = 10**(-0.1 * esno_db)
no = expand_to_rank(no, 2)
no = tf.cast (no, tf.float32)
#tf.print ('no:', no)
################
## Transmitter
################
b = self._binary_source([batch_size, k])
c = self._encoder(b)
# Modulation
x = self._mapper(c) # x [batch size, num_symbols_per_codeword]
# Upsample the QAM symbol sequence
x_us = self.us(x)
xfilt_out = rrcf(x_us)
# ################
# ## Channel
# ################
limit_out = limiter (xfilt_out * limiter.ibo)
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword]
mf_out = mf (y)
# # Instantiate a downsampling layer
# num_symbols = x.shape[1]
# # Recover the transmitted symbol sequence
x_hat = self.ds(mf_out)[:,:x.shape[1]]
#xconst_sqnorm = power (x) should be 1 anyway
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype)
#print ('signal:', signal)
#print ('y:', y)
y = x_hat / signal
#tf.print ('y:', y)
################
## Receiver
################
#tf.print (y.shape, no.shape)
#print ('trace demapper')
llr = self._demapper(y, no)
# Outer decoding
b_hat = self._decoder(llr)
#tf.print (b[0,:10], b_hat[0,:10])
#tf.print ('exec')
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation
class Pred(Model):
def __init__(self, constellation=None):
super().__init__()
################
## Transmitter
################
self._binary_source = BinarySource()
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol)
if constellation is None:
constellation = Constellation("qam", num_bits_per_symbol, trainable=False)
self.constellation = constellation
else:
self.constellation = constellation
self._mapper = Mapper(constellation=constellation)
################
## Channel
################
self._channel = AWGN()
self.us = Upsampling(samples_per_symbol)
self.ds = Downsampling(samples_per_symbol, rrcf.length-1)
################
## Receiver
################
self._demapper = Demapper("app", constellation=constellation)#demapper (constellation.points, M, num_symbols_per_codeword)
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True)
@tf.function()#jit_compile=True)
def call(self, batch_size, esno_db=None, ebno_db=None, return_limit_out=False):
# kludge: sim_ber will call with ebno_db, but it's really esno we're using
if esno_db == None:
esno_db = ebno_db
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers
if len(esno_db.shape) == 0:
esno_db = tf.fill([batch_size], esno_db)
no = 10**(-0.1 * esno_db)
no = expand_to_rank(no, 2)
no = tf.cast (no, tf.float32)
#tf.print ('no:', no)
################
## Transmitter
################
b = self._binary_source([batch_size, k])
c = self._encoder(b)
# Modulation
x = self._mapper(c) # x [batch size, num_symbols_per_codeword]
# Note ibo is not updated during iterations, that would be impractical
x_pred, mse = pred (x, 1.0, betas, 20*log10(limiter.ibo), limiter, samples_per_symbol, span_in_symbols, rrcf, mf, norm=opt.normalize)
# Upsample the QAM symbol sequence
x_us = self.us(x_pred)
xfilt_out = rrcf(x_us)
# ################
# ## Channel
# ################
# Find a new ibo with the predistorted signal to give the proper obo
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x_pred)), tf.complex64)
print ('ibo set to:', 10*log10(ibo))
limit_out = limiter (xfilt_out * ibo)
if return_limit_out:
return limit_out
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword]
mf_out = mf (y)
# # Instantiate a downsampling layer
# num_symbols = x.shape[1]
# # Recover the transmitted symbol sequence
x_hat = self.ds(mf_out)[:,:x.shape[1]]
#xconst_sqnorm = power (x) should be 1 anyway
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype)
#print ('signal:', signal)
#print ('y:', y)
y = x_hat / signal
#tf.print ('y:', y)
################
## Receiver
################
#tf.print (y.shape, no.shape)
#print ('trace demapper')
llr = self._demapper([y, no])
# Outer decoding
b_hat = self._decoder(llr)
#tf.print (b[0,:10], b_hat[0,:10])
#tf.print ('exec')
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation
from llr import bit_llr_to_sym_ll3
def prob (x):
y = -x
one = tf.constant (1., dtype=tf.float64)
zero = tf.constant (0., dtype=tf.float64)
return tf.where (y >= zero, one/(one + tf.exp (-y)), one/(one + tf.exp (y)))
class Cancel(Model):
def __init__(self, constellation=None):
super().__init__()
################
## Transmitter
################
self._binary_source = BinarySource()
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol)
if constellation is None:
constellation = Constellation("qam", num_bits_per_symbol, trainable=False)
self.constellation = constellation
else:
self.constellation = constellation
self._mapper = Mapper(constellation=constellation)
################
## Channel
################
self._channel = AWGN()
self.us = Upsampling(samples_per_symbol)
self.ds = Downsampling(samples_per_symbol, rrcf.length-1)
################
## Receiver
################
self._demapper = Demapper("app", constellation=constellation)#demapper (constellation.points, M, num_symbols_per_codeword)
self._decoder = LDPC5GDecoder(self._encoder, hard_out=False, stateful=True, num_iter=20, return_infobits=False)
#@tf.function()#jit_compile=True)
def call(self, batch_size, esno_db=None, ebno_db=None):
# kludge: sim_ber will call with ebno_db, but it's really esno we're using
if esno_db == None:
esno_db = ebno_db
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers
if len(esno_db.shape) == 0:
esno_db = tf.fill([batch_size], esno_db)
no = 10**(-0.1 * esno_db)
no = expand_to_rank(no, 2)
no = tf.cast (no, tf.float32)
#tf.print ('no:', no)
################
## Transmitter
################
b = self._binary_source([batch_size, k])
c = self._encoder(b)
# Modulation
x = self._mapper(c) # x [batch size, num_symbols_per_codeword]
# Note ibo is not updated during iterations, that would be impractical
# Upsample the QAM symbol sequence
x_us = self.us(x)
xfilt_out = rrcf(x_us)
# ################
# ## Channel
# ################
# Find a new ibo with the predistorted signal to give the proper obo
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x)), tf.complex64)
print ('ibo set to:', 10*log10(ibo))
limit_out = limiter (xfilt_out * ibo)
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword]
mf_out = mf (y)
# # Instantiate a downsampling layer
# num_symbols = x.shape[1]
# # Recover the transmitted symbol sequence
x_hat = self.ds(mf_out)[:,:x.shape[1]]
#xconst_sqnorm = power (x) should be 1 anyway
msg_vn = None
dist = 0
for iter in range (opt.rcv_iter):
signal = tf.reduce_sum (tf.multiply (x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype)
#print ('signal:', signal)
#print ('y:', y)
y = x_hat / signal
y -= dist
mse = power (y - x)
print ('iter:', iter, 'mse:', 10*log10(mse))
#tf.print ('y:', y)
################
## Receiver
################
#tf.print (y.shape, no.shape)
#print ('trace demapper')
llr = self._demapper([y, no])
# Outer decoding
b_hat, msg_vn = self._decoder([llr, msg_vn])
b_hd = tf.cast(tf.less(0.0, b_hat), b.dtype)
bler = tf.reduce_mean (tf.cast (tf.not_equal(c, b_hd), tf.float32))
# bler = tf.reduce_any(tf.not_equal(c, b_hd), axis=-1)
# bler = tf.cast(bler, tf.float64) # tf.float64 to suport large batch-sizes
# bler = tf.reduce_mean(bler)
# tf.reduce_mean (tf.cast (tf.not_equal(c, b_hd), tf.float32), axis=-1)
print ('bler:', bler)
sym_ll = tf.convert_to_tensor ([bit_llr_to_sym_ll3 (-b_hat[row].numpy().astype(float), llr_mask.astype(np.int32)) for row in range (b_hat.shape[0])])
sym_prob = tf.exp (sym_ll)
tfconst = self.constellation.points
exp_syms = tf.tensordot (tf.cast(sym_prob, tf.complex64), tfconst, 1)
remod_us = self.us(exp_syms)
remod_out = rrcf(remod_us)
limit_out = limiter (remod_out * limiter.ibo)
remod_mf_out = mf (limit_out)
remod_x_hat = self.ds(remod_mf_out)[:,:x.shape[1]]
signal2 = tf.reduce_sum (tf.multiply (remod_x_hat, tf.math.conj(x)))/tf.cast(tf.size(x), x.dtype)
#print ('signal:', signal)
#print ('y:', y)
y2 = remod_x_hat / signal2
dist = y2 - exp_syms # diff between what went into mod and came out of demod
breakpoint()
#tf.print (b[0,:10], b_hat[0,:10])
#tf.print ('exec')
return b, b_hd # Ground truth and reconstructed information bits returned for BER/BLER computation
class Centroids(Model):
# I think to train need both mapper and constellation to be trainable
def __init__(self, constellation=None, training=False, pred=False):
super().__init__()
self._training = training
self.pred = pred
################
## Transmitter
################
self._binary_source = BinarySource()
if not training:
self._encoder = LDPC5GEncoder(k, n, num_bits_per_symbol)
if constellation is None:
constellation = Constellation("qam", num_bits_per_symbol, trainable=False)
self.constellation = constellation
else:
self.constellation = constellation
self._mapper = Mapper(constellation=constellation, return_indices=True, trainable=training)
################
## Channel
################
self._channel = AWGN()
################
## Receiver
################
#self._demapper = Demapper("app", constellation=self.constellation2)
self._demapper = demapper(M, num_symbols_per_codeword)
if not training:
self._decoder = LDPC5GDecoder(self._encoder, hard_out=True)
if self._training:
self._bce = tf.keras.losses.BinaryCrossentropy(from_logits=True)
self.us = Upsampling(samples_per_symbol)
self.ds = Downsampling(samples_per_symbol, rrcf.length-1)
self.sum_is = tf.Variable (tf.zeros (M))
self.sum_i2s = tf.Variable (tf.zeros(M))
self.sum_qs = tf.Variable (tf.zeros(M))
self.sum_q2s = tf.Variable (tf.zeros(M))
self.sum_iqs = tf.Variable (tf.zeros(M))
self.counts = tf.Variable (tf.zeros(M))
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (self._mapper.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
@tf.function()#jit_compile=True)
def call(self, batch_size, esno_db=None, ebno_db=None):
# kludge: sim_ber will call with ebno_db, but it's really esno we're using
if esno_db == None:
esno_db = ebno_db
# If `esno_db` is a scalar, a tensor with shape [batch size] is created as it is what is expected by some layers
if len(esno_db.shape) == 0:
esno_db = tf.fill([batch_size], esno_db)
no = 10**(-0.1 * esno_db)
no = expand_to_rank(no, 2)
no = tf.cast (no, tf.float32)
################
## Transmitter
################
if self._training:
c = self._binary_source([batch_size, n])
else:
b = self._binary_source([batch_size, k])
c = self._encoder(b)
# Modulation
x, xconst = self._mapper(c) # x [batch size, num_symbols_per_codeword]
if self.pred:
x, mse = pred (x, 1.0, betas, 20*log10(limiter.ibo), limiter, samples_per_symbol, span_in_symbols, rrcf, mf, norm=opt.normalize)
# Upsample the QAM symbol sequence
x_us = self.us(x)
xfilt_out = rrcf(x_us)
################
## Channel
################
if self.pred:
# Find a new ibo with the predistorted signal to give the proper obo
ibo = tf.cast (10**(0.05 * find_ibodB (None, opt.obodB, x)), tf.complex64)
else:
ibo = limiter.ibo
limit_out = limiter (xfilt_out * ibo)
y = self._channel([limit_out, no*tf.cast(samples_per_symbol, no.dtype)]) # [batch size, num_symbols_per_codeword]
mf_out = mf (y)
x_hat = self.ds(mf_out)[:,:x.shape[1]]
rsyms = [x_hat[xconst==i] for i in range (M)]
if self._training:
self.sum_is.assign ([tf.reduce_sum (tf.math.real (rsyms[i])) for i in range (M)])
self.sum_i2s.assign ([tf.reduce_sum (tf.math.real (rsyms[i])**2) for i in range (M)])
self.sum_qs.assign ([tf.reduce_sum (tf.math.imag (rsyms[i])) for i in range (M)])
self.sum_q2s.assign ([tf.reduce_sum (tf.math.imag (rsyms[i])**2) for i in range (M)])
self.sum_iqs.assign ([tf.reduce_sum (tf.math.real (rsyms[i]) * tf.math.imag (rsyms[i])) for i in range (M)])
self.counts.assign (tf.convert_to_tensor ([len (rsyms[i]) for i in range (M)], dtype=tf.float32))
else:
self.sum_is.assign_add ([tf.reduce_sum (tf.math.real (rsyms[i])) for i in range (M)])
self.sum_i2s.assign_add ([tf.reduce_sum (tf.math.real (rsyms[i])**2) for i in range (M)])
self.sum_qs.assign_add ([tf.reduce_sum (tf.math.imag (rsyms[i])) for i in range (M)])
self.sum_q2s.assign_add ([tf.reduce_sum (tf.math.imag (rsyms[i])**2) for i in range (M)])
self.sum_iqs.assign_add ([tf.reduce_sum (tf.math.real (rsyms[i]) * tf.math.imag (rsyms[i])) for i in range (M)])
self.counts.assign_add (tf.convert_to_tensor ([len (rsyms[i]) for i in range (M)], dtype=tf.float32))
mean_is = [self.sum_is[i]/self.counts[i] for i in range (M)]
mean_qs = [self.sum_qs[i]/self.counts[i] for i in range (M)]
var_is = [self.sum_i2s[i]/self.counts[i] - (mean_is[i]**2) for i in range (M)]
var_qs = [self.sum_q2s[i]/self.counts[i] - (mean_qs[i]**2) for i in range (M)]
corrs = [(self.sum_iqs[i]/self.counts[i] - (mean_is[i] * mean_qs[i])) / (tf.sqrt(var_is[i]) * tf.sqrt(var_qs[i])) for i in range (M)]
stuff = [(tf.math.real (x_hat) - mean_is[i])**2 / var_is[i] + (tf.math.imag (x_hat) - mean_qs[i])**2 / var_qs[i] - 2 * corrs[i] * (tf.math.real (x_hat) - mean_is[i]) / tf.sqrt (var_is[i]) * (tf.math.imag (x_hat) - mean_qs[i]) / tf.sqrt (var_qs[i]) for i in range (M)]
junk = [stuff[i] - tf.math.log (2. * tf.constant (np.pi) * tf.sqrt ((1 - corrs[i])**2 * var_is[i] * var_qs[i])) for i in range (M)]
lls = tf.convert_to_tensor([-1 / (2 * (1 - corrs[i]**2)) * junk[i] for i in range (M)])
lls = tf.transpose (lls, perm=[1,2,0])
################
## Receiver
################
#llr = self._demapper([y, no])
llr = self._demapper(lls)
# Outer decoding
if self._training:
return self._bce(c, llr)
else:
b_hat = self._decoder(llr)
return b,b_hat # Ground truth and reconstructed information bits returned for BER/BLER computation
# To train mapper for centroid demapper
# model_centroids_trainable = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym, trainable=True), training=True)
# to train:
# run_ml_training (model_centroids_trainable, suffix='centroids-train')
# Then to test
# model_centroids3 = Centroids (Constellation (constellation_type='custom', initial_value=model_centroids_trainable.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym))
# run_centroids (model_centroids3, label='mapper=centroidopt+demapper=centroid')
#
# for opt mapper + centroid demapper:
# you might need load_weights first if you haven't just trained model_64apsk
#model_centroids2 = Centroids (Constellation (constellation_type='custom', initial_value=model_64apsk.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym))
# In[22]:
# Range of SNRs over which the systems are evaluated
# In[23]:
# Utility function to load and set weights of a model
def load_weights(model, model_weights_path):
model(1, tf.constant(10.0, tf.float32))
with open(model_weights_path, 'rb') as f:
weights = pickle.load(f)
model.set_weights(weights)
# The next cell evaluate the baseline and the two autoencoder-based communication systems, trained with different method.
# The results are stored in the dictionary ``BLER``.
# In[24]:
def run_qam64():
print ('Qam64')
model_qam64 = Baseline()
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_qam64.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
_,bler = sim_ber(model_qam64, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter)
BLER['64qam'] = bler.numpy()
def run_apsk64():
from const64apsk import gen_constellation_64apsk
c = gen_constellation_64apsk ('3/4')
model_apsk = Baseline (Constellation (constellation_type='custom', initial_value=c, num_bits_per_symbol=6))
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_apsk.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
_,bler = sim_ber(model_apsk, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter)
BLER['64apsk'] = bler.numpy()
def plot_centroids():
mc (batch_size=128, esno_db=tf.cast (200, tf.float32))
centroids = tf.cast (mc.centroids, dtype=tf.complex64).numpy()
fig,ax = plt.subplots()
x_hat = tf.cast (mc.x_hat, dtype=tf.complex64).numpy().flatten()
ax.scatter (x_hat.real, x_hat.imag)
ax.scatter (centroids.real, centroids.imag, marker='x')
plt.show()
if __name__ == '__main__':
import sys
args = sys.argv
opt,logname = parse_args (args)
setup(opt, logname)
esno_dbs = np.arange(opt.esno_min, # Min SNR for evaluation
opt.esno_max, # Max SNR for evaluation
opt.esno_step) # Step
def setup_logger():
import logger
import sys
sys.stderr = logger.logger (logname)
sys.results_file = logger.logger (logname.replace ('.log', '.results'), mode='wb')
def log_bler(esno_dbs, bit_errors, block_errors, nb_bits, nb_blocks):
from contextlib import redirect_stdout
if hasattr (sys.stderr, 'reopen'):
sys.stderr.reopen()
with redirect_stdout(sys.stderr):
print (args)
print (opt)
print ('esno:', esno_dbs.numpy())
print ('block_errors:', block_errors.numpy())
print ('blocks:', nb_blocks.numpy())
bler = tf.cast(block_errors, tf.float64) / tf.cast(nb_blocks, tf.float64)
print ('bler:', bler.numpy())
sys.stderr._flush()
if hasattr (sys, 'results_file'):
from pickle import dumps
results = {
'args' : args,
'opt' : opt,
'esno_db' : esno_dbs.numpy(),
'block_errors' : block_errors.numpy(),
'blocks' : nb_blocks.numpy(),
'bler' : bler.numpy(),
}
sys.results_file.reopen()
sys.results_file.write (dumps (results, -1))
sys.results_file._flush()
if opt.ibodB is not None:
opt.obodB = measure_obo (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym), opt.ibodB)
print (f'ibo was set to {opt.ibodB}, obo is set to {opt.obodB}')
def run_centroids(model_centroids, esno_dbs):
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_centroids.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
_,bler = sim_ber(model_centroids, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter, callback=log_bler, min_bler=opt.min_bler)
BLER = (esno_dbs, bler.numpy())
return BLER
def run_ml(N=opt.N, n_layers=opt.n_layers, demapper=None, label='mapper=opt+demapper=NN', suffix=None, esno_dbs=esno_dbs, width=opt.width, kind=opt.kind, pred=False, load=opt.load):
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=N, n_layers=n_layers, width=width, kind=kind, pred=pred)
load_weights(model_conventional, load)
if demapper == 'app':
model_conventional._demapper = Demapper (demapping_method='app', num_bits_per_symbol=num_bits_per_symbol, constellation=model_conventional.constellation)
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_conventional.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
_,bler = sim_ber(model_conventional, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter, callback=log_bler, min_bler=opt.min_bler)
if label is None:
if demapper == 'app':
label = 'autoencoder-app'+suffix
else:
label = 'autoencoder-conv'+suffix
BLER = (esno_dbs, bler.numpy())
return BLER
def run_baseline(model, esno_dbs=esno_dbs):
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
_,bler = sim_ber(model, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter, callback=log_bler, min_bler=opt.min_bler)
BLER = (esno_dbs, bler.numpy())
return BLER
def run_pred(model, esno_dbs=esno_dbs):
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
_,bler = sim_ber(model, esno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=opt.max_iter)
BLER = (esno_dbs, bler.numpy())
return BLER
# model_rl = E2ESystemRLTraining(training=False)
# load_weights(model_rl, model_weights_path_rl_training)
# _,bler = sim_ber(model_rl, ebno_dbs, batch_size=128, num_target_block_errors=opt.max_block_errors, max_mc_iter=1000)
# BLER['autoencoder-rl'] = bler.numpy()
def save_results():
with open(results_filename, 'wb') as f:
pickle.dump((esno_dbs, BLER), f)
def run_all():
print ('run training')
run_ml_training()
print ('run ml')
run_ml()
print ('run qam64')
run_qam64()
print ('run centroids(64apsk)')
run_centroids ('64apsk')
print ('run centroids(64qam)')
run_centroids ('64qam')
print ('run 64apsk')
run_apsk64()
# In[25]:
def ebno_to_esno (ebnodB, R, bits_per_sym):
return ebnodB + 10*np.log10(R * bits_per_sym)
def plot_results():
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from itertools import cycle
m_cycle = cycle(Line2D.filled_markers)
fig,ax = plt.subplots()
# Baseline - Perfect CSI
#esno_dbs = ebno_dbs + 10*np.log10 (coderate * bits_per_sym)
for key, val in BLER.items():
ax.semilogy(val[0], val[1], label=key, marker=next(m_cycle))
ax.set_xlabel(r"$E_s/N_0$ (dB)")
ax.set_ylabel("BLER")
ax.grid(which="both")
ax.set_ylim((1e-5, 1.0))
ax.legend()
fig.suptitle (f'limit={limiter.name},obo(dB)={opt.obodB}, R={coderate}, N={N}')
plt.tight_layout()
plt.savefig (logname + '.pdf')
plt.show()
# model_conventional = E2ESystemConventionalTraining(training=True)
# load_weights(model_conventional, model_weights_path_conventional_training)
# fig = model_conventional.constellation.show()
# fig.suptitle('Conventional training');
def create_df(BLER, label):
import pandas as pd
import functools
df = pd.DataFrame ({'esno' : BLER[0], 'bler' : BLER[1]})
#df.set_index ('esno', inplace=True)
df.replace (0.0, np.nan, inplace=True)
return df
def plot_df(df):
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from itertools import cycle
m_cycle = cycle(Line2D.filled_markers)
fig,ax = plt.subplots()
lines = df.plot (logy=True, ax=ax)
for line in lines.get_lines():
line.set_marker (next (m_cycle))
ax.set_xlabel(r"$E_s/N_0$ (dB)")
ax.set_ylabel("BLER")
ax.grid(which="both")
ax.set_ylim((1e-5, 1.0))
ax.legend()
fig.suptitle (f'limit={limiter.name},obo(dB)={opt.obodB}, R={coderate}, N={N}')
plt.tight_layout()
plt.savefig (logname + '.pdf')
plt.show()
def save_results(df):
from pickle import dump
dump (df, open (logname + '.results.pickle', 'wb'), -1)
# In[28]:
# model_rl = E2ESystemRLTraining(training=False)
# load_weights(model_rl, model_weights_path_rl_training)
# fig = model_rl.constellation.show()
# fig.suptitle('RL-based training');
def plot_const(PDF, model, logname):
if PDF:
from mplcairo.multipage import MultiPage
cm = MultiPage(logname)
else:
from contextlib import suppress
cm = suppress()
c = const
c2 = model.constellation.points.numpy()
import matplotlib.pyplot as plt
with cm as pdf:
fig, ax = plt.subplots()
ax.set_aspect('equal')
ax.scatter (c.real, c.imag, marker='o', c='blue')
ax.set_xlim (-1.5, 1.5)
ax.set_ylim (-1.5, 1.5)
for j, p in enumerate (c):
ax.annotate (np.binary_repr (j, bits_per_sym), (p.real, p.imag))
if PDF:
pdf.savefig(fig)
plt.close()
else:
plt.show()
fig, ax = plt.subplots()
ax.set_aspect('equal')
ax.scatter (c2.real, c2.imag, marker='o', c='red')
ax.set_xlim (-1.5, 1.5)
ax.set_ylim (-1.5, 1.5)
for j, p in enumerate (c2):
ax.annotate (np.binary_repr (j, bits_per_sym), (p.real, p.imag))
if PDF:
pdf.savefig(fig)
plt.close()
else:
plt.show()
fig, ax = plt.subplots()
ax.set_aspect('equal')
ax.scatter (c.real, c.imag, marker='x', c='blue')
ax.scatter (c2.real, c2.imag, marker='o', c='red')
for a,b in zip (c, c2):
ax.arrow (b.real, b.imag, (a-b).real, (a-b).imag, length_includes_head=True)
if PDF:
pdf.savefig(fig)
plt.close()
else:
plt.show()
def plot_bler (BLER):
import pandas as pd
#array = np.array
df = pd.DataFrame(BLER)
df['esno'] = esno_dbs
df.set_index('esno', inplace=True)
df.replace(0, np.nan, inplace=True)
import matplotlib.pyplot as plt
fig,ax = plt.subplots()
df.plot (logy=True, ax=ax)
ax.grid()
fig.suptitle ('64apsk N=8400 R=2/3 linear')
plt.show()
def try_perm (const, esno_dbs):
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=N, n_layers=3)
z = np.arange (bits_per_sym)
blers = []
ps = []
from itertools import permutations
for i,p in enumerate (permutations (z)):
print (p)
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=N, n_layers=3, z=p)
_,bler = sim_ber(model_conventional, esno_dbs, batch_size=128, num_target_block_errors=5, max_mc_iter=100)
ps.append (p)
blers.append (bler)
return {'bler' : blers, 'z' : ps}
def bitwise_mutual_info (bce):
return 1. - bce/tf.math.log(2.0)
# def capacity (model, esno_range, bits_per_sym):
# c = [bits_per_sym * bitwise_mutual_info (model (batch_size=256, esno_db=tf.constant(esno))).numpy() for esno in esno_range]
# return c
def capacity (model, esno_range, bits_per_sym):
c = [bits_per_sym * (model (batch_size=256, esno_db=tf.constant(esno), return_ami=True)).numpy() for esno in esno_range]
return c
# ## References <a class="anchor" id="References"></a>
# [1] T. O’Shea and J. Hoydis, "An Introduction to Deep Learning for the Physical Layer," in IEEE Transactions on Cognitive Communications and Networking, vol. 3, no. 4, pp. 563-575, Dec. 2017, doi: 10.1109/TCCN.2017.2758370.
#
# [2] S. Cammerer, F. Ait Aoudia, S. Dörner, M. Stark, J. Hoydis and S. ten Brink, "Trainable Communication Systems: Concepts and Prototype," in IEEE Transactions on Communications, vol. 68, no. 9, pp. 5489-5503, Sept. 2020, doi: 10.1109/TCOMM.2020.3002915.
#
# [3] F. Ait Aoudia and J. Hoydis, "Model-Free Training of End-to-End Communication Systems," in IEEE Journal on Selected Areas in Communications, vol. 37, no. 11, pp. 2503-2516, Nov. 2019, doi: 10.1109/JSAC.2019.2933891.
#args = ['32apsk_cnn.py']
if opt.action == 'run-baseline':
setup_logger()
model_baseline = E2ESystemConventionalTraining(training=False, constellation=const, demapper_type='app')
BLER = run_baseline (model_baseline)
if opt.action == 'run-opt-baseline':
BLER = run_ml (demapper='app')
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=opt,demapper=NN,w={opt.width},d={opt.n_layers},N={opt.N}'),
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-pred':
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter)
print ('betas:', betas)
from iter_pred import pred
model_pred = Pred (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym))
BLER = run_pred (model_pred)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=pred+orig,demapper=app'),
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-cancel':
model_pred = Cancel (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym))
BLER = run_pred (model_pred)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=orig,demapper=cancel+app'),
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-opt-pred':
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter)
print ('betas:', betas)
from iter_pred import pred
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width)
load_weights(model_conventional, opt.load)
model_pred = Pred (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym))
BLER = run_pred (model_pred)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=pred+opt,demapper=app'),
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-centroids':
setup_logger()
model_centroids = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym))
BLER = run_centroids (model_centroids, esno_dbs=esno_dbs)
# with open (logname, 'w') as out:
# print (args, file=out)
# print (opt, file=out)
# from pickle import dump
# results = {
# 'opt' : opt,
# 'args' : args,
# 'df' : create_df (BLER, label=f'mapper=opt,demapper=NN,w={opt.width},d={opt.n_layers},N={opt.N}'),
# }
# dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-pred-centroids':
model_centroids = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym), pred=True)
from iter_pred import pred
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter)
print ('betas:', betas)
BLER = run_centroids (model_centroids, esno_dbs=esno_dbs)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=orig,demapper=centroids'),
}
dump (results, open (results_filename, 'wb'), -1)
def do_train_ml():
model_32apsk = E2ESystemConventionalTraining(training=True, constellation=const, N=opt.N, n_layers=opt.n_layers, train_mapper=True, width=opt.width, kind=opt.kind)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
weights_path = run_ml_training (model_32apsk)
return weights_path
if opt.action == 'train-ml':
do_train_ml()
if opt.action == 'train-and-run-ml':
print ('training...')
weights_path = do_train_ml()
print ('running...')
run_ml (load=weights_path)
if opt.action == 'train-centroids':
model_32apsk = model_centroids_trainable = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym, trainable=True), training=True)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
run_ml_training (model_32apsk)
if opt.action == 'train-ml-pred':
from iter_pred import pred
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter)
print ('betas:', betas, 'sum:', sum(betas))
model_32apsk = E2ESystemConventionalTraining(training=True, constellation=const, N=opt.N, n_layers=opt.n_layers, train_mapper=True, width=opt.width, kind=opt.kind, pred=True)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
run_ml_training (model_32apsk)
def do_run_ml():
setup_logger()
BLER = run_ml ()
return BLER
if opt.action == 'run-ml':
BLER = do_run_ml()
if opt.action == 'run-ml-pred':
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter)
print ('betas:', betas)
from iter_pred import pred
BLER = run_ml (pred=True)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=opt,demapper=NN,w={opt.width},d={opt.n_layers},N={opt.N},kind={opt.kind}'),
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-opt-centroids':
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width)
load_weights(model_conventional, opt.load)
model_centroids2 = Centroids (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym))
BLER = run_centroids (model_centroids2, esno_dbs=esno_dbs)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=opt,demapper=centroid')
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-centroid-opt-centroids':
model_centroids = Centroids (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym, trainable=True))
# I think we need trainable=True or we can't load_weights in next step
load_weights(model_centroids, opt.load)
BLER = run_centroids (model_centroids, esno_dbs=esno_dbs)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=centroid-opt,demapper=centroid')
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'run-opt-pred-centroids':
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width)
load_weights(model_conventional, opt.load)
from iter_pred import pred
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter)
print ('betas:', betas)
model_centroids2 = Centroids (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym), pred=True)
BLER = run_centroids (model_centroids2, esno_dbs=esno_dbs)
with open (logname, 'w') as out:
print (args, file=out)
print (opt, file=out)
from pickle import dump
results = {
'opt' : opt,
'args' : args,
'df' : create_df (BLER, label=f'mapper=pred+opt,demapper=centroids'),
}
dump (results, open (results_filename, 'wb'), -1)
if opt.action == 'dump-const':
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width, kind=opt.kind)
load_weights(model_conventional, opt.load)
c = model_conventional.constellation.points.numpy()
const_filename = opt.load.replace ('.log,conventional.weights', '.const')
const_txt_filename = opt.load.replace ('.log,conventional.weights', '.const.txt')
from pickle import dump
dump (c, open (const_filename, 'wb'), -1)
print (c, file=open (const_txt_filename, 'w'))
if opt.action == 'dump-orig-const':
const_filename = opt.load.replace ('.log,conventional.weights', '.orig.const')
from pickle import dump
dump (const, open (const_filename, 'wb'), -1)
if opt.action == 'plot-const':
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width, kind=opt.kind)
load_weights(model_conventional, opt.load)
const_filename = opt.load.replace ('.log,conventional.weights', '.const.pdf')
plot_const (PDF=opt.pdf, model=model_conventional, logname=const_filename)
if opt.action == 'plot-spec':
from spectral_est import spectral_est
S = spectral_est (4096)
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width, kind=opt.kind)
if opt.load is not None:
load_weights(model_conventional, opt.load)
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_conventional.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
limit_out = model_conventional (batch_size=256, esno_db=tf.constant(200.), return_limit_out=True)
for row in range (limit_out.shape[0]):
S += limit_out[row]
S.flush()
import matplotlib.pyplot as plt
from numpy.fft import fftshift
plt.plot (fftshift (10*log10 (S.get())))
plt.grid()
plt.show()
if opt.action == 'plot-spec-pred':
from spectral_est import spectral_est
S = spectral_est (4096)
betas = np.logspace (np.log10(opt.initial_beta), np.log10 (opt.final_beta), opt.pred_iter)
print ('betas:', betas)
from iter_pred import pred
if opt.load is not None:
model_conventional = E2ESystemConventionalTraining(training=False, constellation=const, N=opt.N, n_layers=opt.n_layers, width=opt.width)
load_weights(model_conventional, opt.load)
model_pred = Pred (Constellation (constellation_type='custom', initial_value=model_conventional.constellation.points.numpy(), num_bits_per_symbol=bits_per_sym))
else:
model_pred = Pred (Constellation (constellation_type='custom', initial_value=const, num_bits_per_symbol=bits_per_sym))
if opt.limit == 'sspa':
limiter.ibo = tf.cast (10**(0.05 * find_ibodB (model_pred.constellation, opt.obodB)), tf.complex64)
elif opt.limit == 'linear':
limiter.ibo = tf.constant (1.0, tf.complex64)
print ('ibo(dB) set to:', 20*np.log10 (limiter.ibo.numpy()))
limit_out = model_pred (batch_size=256, esno_db=tf.constant(200.), return_limit_out=True)
for row in range (limit_out.shape[0]):
S += limit_out[row]
S.flush()
import matplotlib.pyplot as plt
from numpy.fft import fftshift
plt.plot (fftshift (10*log10 (S.get())))
plt.grid()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment