Skip to content

Instantly share code, notes, and snippets.

@daniestevez
Created August 26, 2017 20:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daniestevez/95a6af1a4fc874cd020e8b5e5d8416bd to your computer and use it in GitHub Desktop.
Save daniestevez/95a6af1a4fc874cd020e8b5e5d8416bd to your computer and use it in GitHub Desktop.
Eclipse waterfalls
#!/usr/bin/env python2
import numpy as np
from scipy.fftpack import fft, fftshift
from scipy.signal import blackman
import matplotlib.pyplot as plt
import sys
import digital_rf
# Parameters
# Good to crop a 1920x1080 image of whole contest
#N = 4096
#averaging = 1077
# Banner 1920x1080 (to crop)
#N = 1024
#averaging = 1077*4
# High resolution
#N = 16384
#averaging = 93 # 2s
# Small (1920x2048)
N = 2048
averaging = 5624 # 15s
#vmin = {'rx0' : -125, 'rx1' : -130, 'rx2': -135}
#vmax = {'rx0' : -70, 'rx1' : -80, 'rx2' : -80}
vmin = {'rx0' : -125, 'rx1' : -125, 'rx2': -125}
vmax = {'rx0' : -75, 'rx1' : -75, 'rx2' : -75}
def process_channel(channel):
# Digital RF reading
eclipse = digital_rf.DigitalRFReader(['/mnt/eclipse2017'])
start_index, end_index = eclipse.get_bounds(channel)
num_samples = end_index - start_index
total_transforms = num_samples//(N//2) - 1
lines = total_transforms//averaging
window = blackman(N)
npz_filename = 'data_{}.npz'.format(channel)
try:
data = np.load(npz_filename)
waterfall = data['waterfall']
lines_done = data['lines_done']
except IOError:
waterfall = np.zeros((lines, N), dtype=np.float32)
lines_done = 0
done = lines_done == lines
print('Loaded {}/{} lines already computed'.format(lines_done, lines))
try:
for line in range(lines_done, lines):
print('Computing line {}/{}'.format(line + 1, lines))
sum_transforms = np.zeros(N, dtype=np.float32)
for transform in range(averaging):
start = (line * averaging + transform)*N//2
x = eclipse.read_vector(start_index + start, N, channel).flatten()
f = fftshift(fft(x * window))
sum_transforms += np.real(f)**2 + np.imag(f)**2
waterfall[line,:] = 10*np.log10(sum_transforms/averaging) - 20*np.log10(N)
lines_done += 1
if not done:
np.savez(npz_filename, waterfall=waterfall, lines_done=lines_done)
except KeyboardInterrupt:
if not done:
print('Keyboard Interrupt. Saving work')
np.savez(npz_filename, waterfall=waterfall, lines_done=lines_done)
sys.exit()
print('Plotting PNG images')
chunk = 5000
q, r = divmod(waterfall.T.shape[1], chunk)
chunks = q + int(bool(r))
for i in range(chunks):
print('Plotting image {}/{}'.format(i+1, chunks))
plt.imsave('waterfall_{}_{}.png'.format(channel,i),
waterfall.T[:,i*chunk:(i+1)*chunk],
vmin=vmin[channel], vmax=vmax[channel],
cmap='viridis', origin='bottom')
if __name__ == "__main__":
for channel in [ 'rx0', 'rx1', 'rx2' ]:
print 'Processing channel {}'.format(channel)
process_channel(channel)
#!/usr/bin/env python2
import numpy as np
from scipy.fftpack import fft, fftshift
from scipy.signal import blackman
import matplotlib.pyplot as plt
import sys
import digital_rf
# Parameters
# Good to crop a 1920x1080 image of whole contest
#N = 4096
#averaging = 1077
# Banner 1920x1080 (to crop)
#N = 1024
#averaging = 1077*4
# High resolution
#N = 16384
#averaging = 93 # 2s
# Small (1920x2048)
#N = 2048
#averaging = 5624 # 15s
# WWV
N = 2**24
averaging = 2
#vmin = {'rx0' : -125, 'rx1' : -130, 'rx2': -135}
#vmax = {'rx0' : -70, 'rx1' : -80, 'rx2' : -80}
#vmin = {'rx0' : -125, 'rx1' : -125, 'rx2': -125}
#vmax = {'rx0' : -75, 'rx1' : -75, 'rx2' : -75}
vmin = {'rx1' : -170}
vmax = {'rx1': -100 }
def process_channel(channel):
# Digital RF reading
eclipse = digital_rf.DigitalRFReader(['/mnt/eclipse2017'])
start_index, end_index = eclipse.get_bounds(channel)
num_samples = end_index - start_index
total_transforms = num_samples//(N//2) - 1
lines = total_transforms//averaging
window = blackman(N)
npz_filename = 'data_{}.npz'.format(channel)
centre_freq = 10e6 - 9970e3 + 1e3
sample_rate = 384e3
hz_per_bin = float(sample_rate)/N
centre_bin = int(centre_freq/hz_per_bin) + N//2
span_hz = 10
span_bins = int(span_hz/hz_per_bin)
bins_to_keep = np.arange(centre_bin - span_bins, centre_bin + span_bins)
try:
data = np.load(npz_filename)
waterfall = data['waterfall']
lines_done = data['lines_done']
except IOError:
waterfall = np.zeros((lines, len(bins_to_keep)), dtype=np.float32)
lines_done = 0
done = lines_done == lines
print('Loaded {}/{} lines already computed'.format(lines_done, lines))
try:
for line in range(lines_done, lines):
print('Computing line {}/{}'.format(line + 1, lines))
sum_transforms = np.zeros(len(bins_to_keep), dtype=np.float32)
for transform in range(averaging):
start = (line * averaging + transform)*N//2
x = eclipse.read_vector(start_index + start, N, channel).flatten()
f = fftshift(fft(x * window))[bins_to_keep]
sum_transforms += np.real(f)**2 + np.imag(f)**2
waterfall[line,:] = 10*np.log10(sum_transforms/averaging) - 20*np.log10(N)
lines_done += 1
if not done:
np.savez(npz_filename, waterfall=waterfall, lines_done=lines_done)
except KeyboardInterrupt:
if not done:
print('Keyboard Interrupt. Saving work')
np.savez(npz_filename, waterfall=waterfall, lines_done=lines_done)
sys.exit()
print('Plotting PNG images')
chunk = 5000
q, r = divmod(waterfall.T.shape[1], chunk)
chunks = q + int(bool(r))
for i in range(chunks):
print('Plotting image {}/{}'.format(i+1, chunks))
plt.imsave('waterfall_{}_{}.png'.format(channel,i),
waterfall.T[:,i*chunk:(i+1)*chunk],
vmin=vmin[channel], vmax=vmax[channel],
cmap='viridis', origin='bottom')
if __name__ == "__main__":
for channel in ['rx1']:
print 'Processing channel {}'.format(channel)
process_channel(channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment