Skip to content

Instantly share code, notes, and snippets.

@daniestevez
Created August 19, 2017 09:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daniestevez/dbe7d05ee1dfec00e7d6c7517634dc4e to your computer and use it in GitHub Desktop.
Save daniestevez/dbe7d05ee1dfec00e7d6c7517634dc4e to your computer and use it in GitHub Desktop.
JT9A acquisition and wipeoff
#!/usr/bin/env python3
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy.signal import hilbert
from scipy.signal import blackman
import sys
import subprocess
import os
from multiprocessing import Pool
WSJTX_PATH = '/home/daniel/wsjt/branches/wsjtx/build/'
if len(sys.argv) == 2:
print('Decoding signal with jt9')
jt9 = subprocess.Popen([WSJTX_PATH + 'jt9'] + ['-9', '-d', '3'] + [sys.argv[1]],
stdout = subprocess.PIPE)
out = str(jt9.communicate()[0], encoding='utf-8').split('\n')[0]
print('jt9 returned:\n' + out)
if out.split()[0] == '<DecodeFinished>':
print('jt9 could not decode signal. Exiting.')
sys.exit(0)
f_signal = int(out.split()[3])
message = out.split('@')[1]
else:
f_signal = int(sys.argv[2])
message = sys.argv[3]
print('Generating replica with jt9sim')
subprocess.call([WSJTX_PATH + 'jt9sim', message, '200', '1', '1', '99', '1'],
stdout = subprocess.DEVNULL)
SIGNAL = sys.argv[1]
REPLICA = '000000_0000.wav'
f_replica = 1400
samp_rate = 12000
_, signal = wavfile.read(SIGNAL)
_, replica = wavfile.read(REPLICA)
signal = signal[:len(replica)]
signal = hilbert(signal)
replica = hilbert(replica)
freq_bin = samp_rate/len(signal)
jt9a_width = 15.6
def plot_fft(x, f, title, N=2**16):
total_transforms = len(x)//(N//2) - 1
window = blackman(N)
sweep = 150 # 150Hz freq offset max
freq_bin = samp_rate/N
max_bin = int(sweep/freq_bin)
centre_bin = int(f/freq_bin)
bins = np.arange(-max_bin, max_bin + 1) + centre_bin
frequencies = bins*freq_bin
transforms = np.zeros((total_transforms, len(bins)), dtype=np.float32)
for transform in range(total_transforms):
start = transform*(N//2)
xx = x[start:start+N]
fft = np.fft.fft(xx * window)[bins]
transforms[transform,:] = np.real(fft)**2 + np.imag(fft)**2
plt.plot(frequencies, 10*np.log10(np.average(transforms, axis=0)) - 20*np.log10(N))
plt.title(title)
plt.xlabel('Frequency (Hz)')
plt.ylabel('Signal (dB)')
plt.show()
def plot_waterfall(x, f, N=2**15, averaging=6, dynrange=20, slices=False):
total_transforms = len(x)//(N//2) - 1
lines = total_transforms//averaging
window = blackman(N)
sweep = 150
freq_bin = samp_rate/N
max_bin = int(sweep/freq_bin)
centre_bin = int(f/freq_bin)
bins = np.arange(-max_bin, max_bin + 1) + centre_bin
frequencies = bins*freq_bin
waterfall = np.empty((lines, len(bins)), dtype=np.float32)
for line in range(lines):
transforms = np.zeros((averaging, len(bins)), dtype=np.float32)
for transform in range(averaging):
start = (line * averaging + transform)*(N//2)
xx = x[start:start+N]
f = np.fft.fft(xx * window)[bins]
transforms[transform,:] = np.real(f)**2 + np.imag(f)**2
waterfall[line,:] = 10*np.log10(np.average(transforms, axis=0)) - 20*np.log10(N)
if slices:
for line in range(lines):
plt.plot(frequencies, waterfall[line,:])
else:
plt.imshow(waterfall.T,
extent=(0, len(x)/samp_rate, -sweep, sweep), origin='bottom',
cmap='viridis', vmin=np.max(waterfall)-dynrange, vmax=np.max(waterfall))
plt.colorbar()
plt.show()
def wipeoff(signal, replica, f_signal = f_replica):
signal_fft = np.fft.fft(signal)
replica_fft_conj = np.conj(np.fft.fft(replica))
sweep = 5 # 5Hz freq offset max
max_bin = int(sweep/freq_bin)
centre_bin = int((f_signal - f_replica)/freq_bin)
bins = np.arange(-max_bin, max_bin + 1) + centre_bin
frequencies = bins*freq_bin + f_replica
samples_range = samp_rate # 1s time offset max
samples_offset = len(signal)//2
samples = np.arange(-samples_range, samples_range)
times = samples/samp_rate
samples += samples_offset
max_corr = np.empty(len(bins))
corr = np.empty((len(bins), len(samples)))
print('Computing {} correlations...'.format(len(bins)))
for j in range(len(bins)):
corr[j,:] = np.absolute(np.fft.fftshift(np.fft.ifft(signal_fft * np.roll(replica_fft_conj, bins[j])))[samples])
max_corr[j] = np.max(corr[j,:])
print('Correlations computed. Plotting...')
plt.plot(frequencies, 20*np.log10(max_corr))
plt.title('Maximum correlation depending on frequency offset')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Maximum correlation (dB)')
plt.show()
best_bin = np.argmax(max_corr)
print('Maximum correlation at bin {}'.format(best_bin))
best_corr = corr[best_bin, :]
plt.plot(times, best_corr**2)
plt.title('Correlation for best frequency offset (linear)')
plt.xlabel('Time (s)')
plt.ylabel('Correlation')
plt.show()
plt.plot(times, 20*np.log10(best_corr))
plt.title('Correlation for best frequency offset (dB)')
plt.xlabel('Time (s)')
plt.ylabel('Correlation (dB)')
plt.show()
plt.imshow(20*np.log10(np.flipud(corr)),
extent=(times[0], times[-1], frequencies[0], frequencies[-1]),
vmin=20*np.log10(np.percentile(corr,75)), vmax=20*np.log10(np.max(corr)),
aspect='auto', cmap='viridis')
plt.title('Time and frequency correlation (dB)')
plt.colorbar()
plt.show()
samples_delta = samples[np.argmax(best_corr)] - samples_offset
print('samples_delta {}'.format(samples_delta))
wipe = np.roll(signal, samples_delta) * np.conj(replica)
return wipe
#plot_fft(replica, f_replica + jt9a_width/2, 'Replica FFT')
plot_fft(signal, f_signal + jt9a_width/2, 'Signal FFT')
#wiped_replica = wipeoff(replica, replica)
#plot_fft(wiped_replica, 0, 'Wiped replica FFT')
wiped_signal = wipeoff(signal, replica, f_signal=f_signal)
plot_fft(wiped_signal, f_signal-f_replica, 'Wiped signal FFT')
plot_waterfall(signal, f_signal)
plot_waterfall(wiped_signal, f_signal-f_replica)
plot_waterfall(wiped_signal, f_signal-f_replica, N=2**14, averaging=16)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment