Skip to content

Instantly share code, notes, and snippets.

@benelsen benelsen/postobs.sh Secret

Last active Dec 3, 2019
Embed
What would you like to do?
SatNOGS Station Meteor-M N2-2
#!/bin/bash
echo "Running postobs.sh"
obs_id=$1
datetime=$2
sat_id=$3
freq=$4
baud=$5
script=$6
echo "$datetime: obs: $obs_id, sat: $sat_id, freq: $freq, baud: $baud, script: $script" | tee -a /data/satnogs/postobs.log
# /home/pi/rtl_biast/build/src/rtl_biast -d 0 -b 0
if [[ $script == "satnogs_lrpt_demod.py" ]]; then
echo "Post-processing Meteor"
ionice -c 2 -n 5 nice -n 5 /usr/bin/python /data/satnogs/process_meteor.py $datetime $obs_id $sat_id >> /data/satnogs/process_meteor.log 2>&1 &
fi
if [[ $script =~ "lrpt" ]] || [[ $script =~ "apt" ]]; then
ntfy -c /etc/ntfy/ntfy.yml -t "SatNOGS" -o expire 60 send "$obs_id (Satellite: $sat_id) finished. $script" &
fi
echo "postobs.sh done"
#!/usr/bin/env python
from glob import glob
import os
import shutil
from time import sleep
import subprocess
import sys
import ntfy
from ntfy.config import load_config
QUIET=False
DEMOD_BIN="/home/pi/meteor_demod/src/meteor_demod"
DECODE_BIN="/home/pi/meteor_decode/src/meteor_decode"
MEDET_BIN="/home/pi/medet/medet"
MOGRIFY_BIN="/usr/bin/mogrify"
CONVERT_BIN="/usr/bin/convert"
ntfy_config = load_config("/data/satnogs/ntfy.yml")
def demodulate(iq_file, s_file, symbol_rate, modulation):
if modulation == 'oqpsk':
pll_bw = '250'
else:
pll_bw = '75'
dem_cmd = [DEMOD_BIN]
if QUIET:
dem_cmd.extend(['-q'])
dem_cmd.append('-B')
dem_cmd.extend(['-R', '1000'])
dem_cmd.extend(['-d', '1000'])
dem_cmd.extend(['-f', '24'])
dem_cmd.extend(['-b', pll_bw])
# dem_cmd.extend(['-a', '0.6'])
dem_cmd.extend(['-s', '156250'])
dem_cmd.extend(['-r', symbol_rate])
dem_cmd.extend(['-m', modulation])
dem_cmd.extend(['-o', s_file])
dem_cmd.append(iq_file)
print(dem_cmd)
with open(os.path.dirname(s_file) + "/" + 'demodulate.log', 'w') as f_out:
dem_return_code = subprocess.call(dem_cmd, stdout = f_out)
return dem_return_code
def decode(s_file, output_file, apids, diff, interleaved, split):
dec_cmd = [DECODE_BIN]
dec_cmd.extend(['-a', ",".join(apids)])
if diff:
dec_cmd.append('-d')
if split:
dec_cmd.append('-S')
else:
output_file = output_file + ".png"
dec_cmd.append('-q')
dec_cmd.append('-s')
dec_cmd.extend(['-o', output_file])
dec_cmd.append(s_file)
print(dec_cmd)
with open(os.path.dirname(output_file) + "/" + 'decode.log', 'w') as f_out:
dec_return_code = subprocess.call(dec_cmd, stdout = f_out)
return dec_return_code
def decode_medet(s_file, output_file, apids, diff, interleaved, split):
dec_cmd = [MEDET_BIN]
dec_cmd.append(s_file)
dec_cmd.append(output_file)
dec_cmd.extend(['-r', apids[0]])
dec_cmd.extend(['-g', apids[1]])
dec_cmd.extend(['-b', apids[2]])
if interleaved:
dec_cmd.append('-int')
if diff:
dec_cmd.append('-diff')
if split:
dec_cmd.append('-S')
dec_cmd.append('-t')
dec_cmd.append('-q')
print(dec_cmd)
with open(os.path.dirname(output_file) + '/' + 'medet.log', 'w') as f_out:
dec_return_code = subprocess.call(dec_cmd, stdout = f_out)
return dec_return_code
def dem_dec(iq_file, s_file, output_file, sat_id = "00000"):
if str(sat_id) == "44387":
apids = map(str, [68,65,64])
if iq_file == None:
return False
# demodulating
ret_dem = demodulate(iq_file, s_file, '72000', 'oqpsk')
# decoding
#ret_dec = decode(s_file, output_file, True, ",".join(apids), False)
ret_dec = decode_medet(s_file, output_file, apids, True, False, False)
elif str(sat_id) == "40069":
apids = map(str, [68,65,64])
# try 72k qpsk
# s file should already be demodulated
if not os.path.isfile(s_file) or os.path.getsize(s_file) < 1000:
if iq_file == None:
return False
ret_dem = demodulate(iq_file, s_file, '72000', 'qpsk')
# decoding
#ret_dec = decode(s_file, output_file, False, ",".join(apids), False)
ret_dec = decode_medet(s_file, output_file, apids, False, False, False)
else:
return False
if ret_dec != 0:
return False
if apids == ['66', '65', '64']:
if os.path.isfile(output_file + ".bmp") and os.path.getsize(output_file + ".bmp") > 1000:
convert_cmd = [CONVERT_BIN, output_file + ".bmp", output_file + "_666564.pmg"]
with open(os.path.dirname(output_file) + '/' + 'convert.log', 'w') as log_out:
convert_return_code = subprocess.call(convert_cmd, stdout = log_out)
if convert_return_code == 0 and os.path.isfile(output_file + "_666564.png") and os.path.getsize(output_file + "_666564.png") > 1000:
os.remove(output_file + ".bmp")
return True
elif apids == ['68', '65', '64']:
if os.path.isfile(output_file + ".bmp") and os.path.getsize(output_file + ".bmp") > 1000:
convert_cmd_1 = [CONVERT_BIN]
convert_cmd_1.extend([output_file + ".bmp", "-channel", "G", "-separate"])
convert_cmd_1.extend([output_file + ".bmp", "-channel", "G", "-separate"])
convert_cmd_1.extend([output_file + ".bmp", "-channel", "B", "-separate"])
convert_cmd_1.extend(["+channel", "-combine", output_file + "_656564.png"])
convert_cmd_2 = [CONVERT_BIN]
convert_cmd_2.extend([output_file + ".bmp"])
convert_cmd_2.extend(["-channel", "R", "-separate", "-colorspace", "Gray", "+channel", "-negate", "-auto-level"])
convert_cmd_2.extend([output_file + "_68.png"])
with open(os.path.dirname(output_file) + '/' + 'convert.log', 'w') as log_out:
convert_return_code_1 = subprocess.call(convert_cmd_1, stdout = log_out)
convert_return_code_2 = subprocess.call(convert_cmd_2, stdout = log_out)
if convert_return_code_1 == 0 and convert_return_code_2 == 0 and \
os.path.isfile(output_file + "_656564.png") and os.path.getsize(output_file + "_656564.png") > 1000 and \
os.path.isfile(output_file + "_68.png") and os.path.getsize(output_file + "_68.png") > 1000:
os.remove(output_file + ".bmp")
return True
elif os.path.isfile(output_file + ".bmp"):
os.remove(output_file + ".bmp")
return False
# # currently sending 80k oqpsk
# ret_dem = demodulate(iq_file, s_file, '80000', 'oqpsk')
# ret_dec = decode_medet(s_file, output_file, apids, True, True, True)
# if os.path.isfile(output_file + ".bmp"):
# shutil.move(output_file + ".bmp", output_file + "_" + "_".join(apids) + ".bmp")
# for (i, apid) in zip(range(0, len(apids)), apids):
# shutil.move(output_file + "_" + str(i) + ".bmp", output_file + "_apid_" + apid + ".bmp")
# if ret_dec == 0 and (os.path.isfile(output_file + "_" + "_".join(apids) + ".bmp") and os.path.getsize(output_file + "_" + "_".join(apids) + ".bmp") > 1000):
# convert_cmd = [MOGRIFY_BIN]
# convert_cmd.extend(['-format', 'png'])
# convert_cmd.append(output_file + "*.bmp")
# with open(os.path.dirname(output_file) + '/' + 'convert.log', 'w') as log_out:
# convert_return_code = subprocess.call(convert_cmd, stdout = log_out)
# if convert_return_code == 0 and os.path.isfile(output_file + "_" + "_".join(apids) + ".png") and os.path.getsize(output_file + "_" + "_".join(apids) + ".png") > 1000:
# for f in glob(output_file + "*.bmp"):
# os.remove(f)
# return True
# # try 72k qpsk
# # s file should already be demodulated
# if not os.path.isfile(s_file) or os.path.getsize(s_file) < 1000:
# ret_dem = demodulate(iq_file, s_file, '72000', 'qpsk')
# ret_dec = decode(s_file, output_file, False, '66,65,64', False)
#
# # try 72k oqpsk
# ret_dem = demodulate(iq_file, s_file, '72000', 'oqpsk')
# ret_dec = decode(s_file, output_file, False, '69,68,67', True)
#
# # try 80k qpsk interleaved
# ret_dem = demodulate(iq_file, s_file, '80000', 'qpsk')
# ret_dec = decode(s_file, output_file, True, '66,65,64', False)
def main():
timestamp = sys.argv[1]
obs_id = sys.argv[2]
sat_id = sys.argv[3]
print(timestamp + " | obs: " + obs_id + " sat_id: " + sat_id)
DEST_DIR="/data/satnogs/complete/"
SATNOGS_DATA_DIR="/tmp/.satnogs/data/"
basename = "data_" + obs_id + "_" + timestamp
iq_file = basename + ".dat"
iq_path = DEST_DIR + iq_file
SRC_IQ_PATH = "/data/satnogs/tmp/iq.dat"
if os.path.isfile(iq_path) and os.path.getsize(iq_path) > 0:
# don't copy
# todo: handle edge cases
pass
elif os.path.isfile(SRC_IQ_PATH) and os.path.getsize(SRC_IQ_PATH) > 0:
# copy
shutil.move(SRC_IQ_PATH, iq_path)
pass
else:
iq_file = None
iq_path = None
# print("undefined behaviour")
# sys.exit("not sure what to do in this case")
s_file = basename + ".s"
s_path = DEST_DIR + s_file
src_s_path = "/data/satnogs/tmp/" + s_file
if os.path.isfile(s_path) and os.path.getsize(s_path) > 0:
pass
else:
if not os.path.isfile(src_s_path):
src_s_glob = "/data/satnogs/tmp/" + "data_" + obs_id + "*.s"
src_s_paths = glob(src_s_glob)
if len(src_s_paths) == 1:
src_s_path = src_s_paths[0]
elif len(src_s_paths) > 1:
print("More than 1 file matched the glob. Not sure what to do.")
if os.path.isfile(src_s_path) and os.path.getsize(src_s_path) > 0:
shutil.move(src_s_path, s_path)
else:
print("no .s file, will demodulate .dat")
output_file = DEST_DIR + basename
output_glob = DEST_DIR + basename + "*.png"
print(iq_path, s_path, src_s_path)
result = dem_dec(iq_path, s_path, output_file, sat_id)
if result:
for f in glob(output_glob):
print("output file:", f)
shutil.copy(f, SATNOGS_DATA_DIR)
else:
print("Demod or decode unsuccessful")
# for f in glob(DEST_DIR + basename + "*.png"):
# os.remove(f)
# for f in glob(DEST_DIR + basename + "*.stat"):
# os.remove(f)
# for f in glob(DEST_DIR + basename + "*.bmp"):
# os.remove(f)
if result:
for f in glob(DEST_DIR + basename + "*.s"):
os.remove(f)
if result:
ntfy.notify(timestamp + " obs: " + obs_id + " sat_id: " + sat_id, "SatNOGS obs successful", ntfy_config, url = "https://network.satnogs.org/observations/" + obs_id + "/")
sys.exit()
else:
ntfy.notify(timestamp + " obs: " + obs_id + " sat_id: " + sat_id, "SatNOGS obs failed", ntfy_config, url = "https://network.satnogs.org/observations/" + obs_id + "/")
sys.exit(1)
if __name__ == "__main__":
main()
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
##################################################
# GNU Radio Python Flow Graph
# Title: LRPT Demodulator
# Author: Mark Jessop (vk5qi@rfhead.net)
# Description: A METEOR LRPT demodulation block
# Generated: Sat Aug 10 18:17:13 2019
##################################################
from datetime import datetime
from gnuradio import analog
from gnuradio import blocks
from gnuradio import digital
from gnuradio import eng_notation
from gnuradio import filter
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
import os.path
import osmosdr
import satnogs
import time
class satnogs_lrpt_demod(gr.top_block):
def __init__(self, antenna=satnogs.not_set_antenna, bb_gain=satnogs.not_set_rx_bb_gain, decoded_data_file_path='/tmp/.satnogs/data/data', dev_args=satnogs.not_set_dev_args, doppler_correction_per_sec=1000, enable_iq_dump=0, file_path='test.wav', if_gain=satnogs.not_set_rx_if_gain, iq_file_path='/data/satnogs/tmp/iq.dat', lo_offset=100e3, ppm=0, rf_gain=satnogs.not_set_rx_rf_gain, rigctl_port=4532, rx_freq=100e6, rx_sdr_device='rtlsdr', waterfall_file_path='/tmp/waterfall.dat'):
gr.top_block.__init__(self, "LRPT Demodulator")
##################################################
# Parameters
##################################################
self.antenna = antenna
self.bb_gain = bb_gain
self.decoded_data_file_path = decoded_data_file_path
self.dev_args = dev_args
self.doppler_correction_per_sec = doppler_correction_per_sec
self.enable_iq_dump = enable_iq_dump = 1
self.file_path = file_path
self.if_gain = if_gain
self.iq_file_path = iq_file_path
self.lo_offset = lo_offset
self.ppm = ppm
self.rf_gain = rf_gain
self.rigctl_port = rigctl_port
self.rx_freq = rx_freq
self.rx_sdr_device = rx_sdr_device
self.waterfall_file_path = waterfall_file_path
##################################################
# Variables
##################################################
self.symb_rate = symb_rate = 72000
self.samp_rate_rx = samp_rate_rx = satnogs.hw_rx_settings[rx_sdr_device]['samp_rate']
self.quadrature_rate = quadrature_rate = 156250
self.xlate_filter_taps = xlate_filter_taps = firdes.low_pass(1, samp_rate_rx, 125000, 25000, firdes.WIN_HAMMING, 6.76)
self.taps = taps = firdes.low_pass(12.0, samp_rate_rx, 100e3, 60000, firdes.WIN_HAMMING, 6.76)
self.sps = sps = (quadrature_rate*1.0)/(symb_rate*1.0)
self.pll_alpha = pll_alpha = 1e-3
self.filter_rate = filter_rate = 250000
self.deviation = deviation = 17000
self.clock_alpha = clock_alpha = 2e-3
self.bitstream_name = bitstream_name = "/data/satnogs/tmp/" + os.path.basename(decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s"
##################################################
# Blocks
##################################################
self.satnogs_waterfall_sink_0 = satnogs.waterfall_sink(quadrature_rate, rx_freq, 8, 1024, waterfall_file_path, 1)
self.satnogs_tcp_rigctl_msg_source_0 = satnogs.tcp_rigctl_msg_source("127.0.0.1", rigctl_port, False, 1000, 1500)
self.satnogs_iq_sink_0 = satnogs.iq_sink(16768, iq_file_path, False, enable_iq_dump)
self.satnogs_coarse_doppler_correction_cc_0 = satnogs.coarse_doppler_correction_cc(rx_freq, samp_rate_rx)
self.root_raised_cosine_filter_0 = filter.fir_filter_ccf(1, firdes.root_raised_cosine(
1, quadrature_rate, symb_rate, 0.6, 361))
self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + satnogs.handle_rx_dev_args(rx_sdr_device, dev_args) )
self.osmosdr_source_0.set_sample_rate(samp_rate_rx)
self.osmosdr_source_0.set_center_freq(rx_freq - lo_offset, 0)
self.osmosdr_source_0.set_freq_corr(ppm, 0)
self.osmosdr_source_0.set_dc_offset_mode(2, 0)
self.osmosdr_source_0.set_iq_balance_mode(0, 0)
self.osmosdr_source_0.set_gain_mode(False, 0)
self.osmosdr_source_0.set_gain(satnogs.handle_rx_rf_gain(rx_sdr_device, rf_gain), 0)
self.osmosdr_source_0.set_if_gain(satnogs.handle_rx_if_gain(rx_sdr_device, if_gain), 0)
self.osmosdr_source_0.set_bb_gain(satnogs.handle_rx_bb_gain(rx_sdr_device, bb_gain), 0)
self.osmosdr_source_0.set_antenna(satnogs.handle_rx_antenna(rx_sdr_device, antenna), 0)
self.osmosdr_source_0.set_bandwidth(samp_rate_rx, 0)
self.freq_xlating_fir_filter_xxx_0 = filter.freq_xlating_fir_filter_ccc(int(samp_rate_rx/filter_rate), (xlate_filter_taps), lo_offset, samp_rate_rx)
self.digital_costas_loop_cc_0 = digital.costas_loop_cc(pll_alpha, 4, False)
self.digital_constellation_soft_decoder_cf_1 = digital.constellation_soft_decoder_cf(digital.constellation_calcdist(([-1-1j, -1+1j, 1+1j, 1-1j]), ([0, 1, 3, 2]), 4, 1).base())
self.digital_clock_recovery_mm_xx_0 = digital.clock_recovery_mm_cc(sps, clock_alpha**2/4.0, 0.5, clock_alpha, 0.005)
self.blocks_float_to_char_0 = blocks.float_to_char(1, 127)
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_char*1, bitstream_name, False)
self.blocks_file_sink_0.set_unbuffered(False)
self.blks2_rational_resampler_xxx_1 = filter.rational_resampler_ccc(
interpolation=10,
decimation=16,
taps=None,
fractional_bw=None,
)
self.analog_rail_ff_0 = analog.rail_ff(-1, 1)
self.analog_agc_xx_0 = analog.agc_cc(1000e-4, 0.5, 1.0)
self.analog_agc_xx_0.set_max_gain(4000)
##################################################
# Connections
##################################################
self.msg_connect((self.satnogs_tcp_rigctl_msg_source_0, 'freq'), (self.satnogs_coarse_doppler_correction_cc_0, 'freq'))
self.connect((self.analog_agc_xx_0, 0), (self.root_raised_cosine_filter_0, 0))
self.connect((self.analog_rail_ff_0, 0), (self.blocks_float_to_char_0, 0))
self.connect((self.blks2_rational_resampler_xxx_1, 0), (self.analog_agc_xx_0, 0))
self.connect((self.blks2_rational_resampler_xxx_1, 0), (self.satnogs_iq_sink_0, 0))
self.connect((self.blks2_rational_resampler_xxx_1, 0), (self.satnogs_waterfall_sink_0, 0))
self.connect((self.blocks_float_to_char_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.digital_clock_recovery_mm_xx_0, 0), (self.digital_constellation_soft_decoder_cf_1, 0))
self.connect((self.digital_constellation_soft_decoder_cf_1, 0), (self.analog_rail_ff_0, 0))
self.connect((self.digital_costas_loop_cc_0, 0), (self.digital_clock_recovery_mm_xx_0, 0))
self.connect((self.freq_xlating_fir_filter_xxx_0, 0), (self.blks2_rational_resampler_xxx_1, 0))
self.connect((self.osmosdr_source_0, 0), (self.satnogs_coarse_doppler_correction_cc_0, 0))
self.connect((self.root_raised_cosine_filter_0, 0), (self.digital_costas_loop_cc_0, 0))
self.connect((self.satnogs_coarse_doppler_correction_cc_0, 0), (self.freq_xlating_fir_filter_xxx_0, 0))
def get_antenna(self):
return self.antenna
def set_antenna(self, antenna):
self.antenna = antenna
self.osmosdr_source_0.set_antenna(satnogs.handle_rx_antenna(self.rx_sdr_device, self.antenna), 0)
def get_bb_gain(self):
return self.bb_gain
def set_bb_gain(self, bb_gain):
self.bb_gain = bb_gain
self.osmosdr_source_0.set_bb_gain(satnogs.handle_rx_bb_gain(self.rx_sdr_device, self.bb_gain), 0)
def get_decoded_data_file_path(self):
return self.decoded_data_file_path
def set_decoded_data_file_path(self, decoded_data_file_path):
self.decoded_data_file_path = decoded_data_file_path
self.set_bitstream_name("/data/satnogs/tmp/" + os.path.basename(self.decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s")
def get_dev_args(self):
return self.dev_args
def set_dev_args(self, dev_args):
self.dev_args = dev_args
def get_doppler_correction_per_sec(self):
return self.doppler_correction_per_sec
def set_doppler_correction_per_sec(self, doppler_correction_per_sec):
self.doppler_correction_per_sec = doppler_correction_per_sec
def get_enable_iq_dump(self):
return self.enable_iq_dump
def set_enable_iq_dump(self, enable_iq_dump):
self.enable_iq_dump = enable_iq_dump
def get_file_path(self):
return self.file_path
def set_file_path(self, file_path):
self.file_path = file_path
def get_if_gain(self):
return self.if_gain
def set_if_gain(self, if_gain):
self.if_gain = if_gain
self.osmosdr_source_0.set_if_gain(satnogs.handle_rx_if_gain(self.rx_sdr_device, self.if_gain), 0)
def get_iq_file_path(self):
return self.iq_file_path
def set_iq_file_path(self, iq_file_path):
self.iq_file_path = iq_file_path
def get_lo_offset(self):
return self.lo_offset
def set_lo_offset(self, lo_offset):
self.lo_offset = lo_offset
self.osmosdr_source_0.set_center_freq(self.rx_freq - self.lo_offset, 0)
self.freq_xlating_fir_filter_xxx_0.set_center_freq(self.lo_offset)
def get_ppm(self):
return self.ppm
def set_ppm(self, ppm):
self.ppm = ppm
self.osmosdr_source_0.set_freq_corr(self.ppm, 0)
def get_rf_gain(self):
return self.rf_gain
def set_rf_gain(self, rf_gain):
self.rf_gain = rf_gain
self.osmosdr_source_0.set_gain(satnogs.handle_rx_rf_gain(self.rx_sdr_device, self.rf_gain), 0)
def get_rigctl_port(self):
return self.rigctl_port
def set_rigctl_port(self, rigctl_port):
self.rigctl_port = rigctl_port
def get_rx_freq(self):
return self.rx_freq
def set_rx_freq(self, rx_freq):
self.rx_freq = rx_freq
self.satnogs_coarse_doppler_correction_cc_0.set_new_freq_locked(self.rx_freq)
self.osmosdr_source_0.set_center_freq(self.rx_freq - self.lo_offset, 0)
def get_rx_sdr_device(self):
return self.rx_sdr_device
def set_rx_sdr_device(self, rx_sdr_device):
self.rx_sdr_device = rx_sdr_device
self.set_samp_rate_rx(satnogs.hw_rx_settings[self.rx_sdr_device]['samp_rate'])
self.osmosdr_source_0.set_gain(satnogs.handle_rx_rf_gain(self.rx_sdr_device, self.rf_gain), 0)
self.osmosdr_source_0.set_if_gain(satnogs.handle_rx_if_gain(self.rx_sdr_device, self.if_gain), 0)
self.osmosdr_source_0.set_bb_gain(satnogs.handle_rx_bb_gain(self.rx_sdr_device, self.bb_gain), 0)
self.osmosdr_source_0.set_antenna(satnogs.handle_rx_antenna(self.rx_sdr_device, self.antenna), 0)
def get_waterfall_file_path(self):
return self.waterfall_file_path
def set_waterfall_file_path(self, waterfall_file_path):
self.waterfall_file_path = waterfall_file_path
def get_symb_rate(self):
return self.symb_rate
def set_symb_rate(self, symb_rate):
self.symb_rate = symb_rate
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0))
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361))
def get_samp_rate_rx(self):
return self.samp_rate_rx
def set_samp_rate_rx(self, samp_rate_rx):
self.samp_rate_rx = samp_rate_rx
self.set_xlate_filter_taps(firdes.low_pass(1, self.samp_rate_rx, 125000, 25000, firdes.WIN_HAMMING, 6.76))
self.osmosdr_source_0.set_sample_rate(self.samp_rate_rx)
self.osmosdr_source_0.set_bandwidth(self.samp_rate_rx, 0)
def get_quadrature_rate(self):
return self.quadrature_rate
def set_quadrature_rate(self, quadrature_rate):
self.quadrature_rate = quadrature_rate
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0))
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361))
def get_xlate_filter_taps(self):
return self.xlate_filter_taps
def set_xlate_filter_taps(self, xlate_filter_taps):
self.xlate_filter_taps = xlate_filter_taps
self.freq_xlating_fir_filter_xxx_0.set_taps((self.xlate_filter_taps))
def get_taps(self):
return self.taps
def set_taps(self, taps):
self.taps = taps
def get_sps(self):
return self.sps
def set_sps(self, sps):
self.sps = sps
self.digital_clock_recovery_mm_xx_0.set_omega(self.sps)
def get_pll_alpha(self):
return self.pll_alpha
def set_pll_alpha(self, pll_alpha):
self.pll_alpha = pll_alpha
self.digital_costas_loop_cc_0.set_loop_bandwidth(self.pll_alpha)
def get_filter_rate(self):
return self.filter_rate
def set_filter_rate(self, filter_rate):
self.filter_rate = filter_rate
def get_deviation(self):
return self.deviation
def set_deviation(self, deviation):
self.deviation = deviation
def get_clock_alpha(self):
return self.clock_alpha
def set_clock_alpha(self, clock_alpha):
self.clock_alpha = clock_alpha
self.digital_clock_recovery_mm_xx_0.set_gain_omega(self.clock_alpha**2/4.0)
self.digital_clock_recovery_mm_xx_0.set_gain_mu(self.clock_alpha)
def get_bitstream_name(self):
return self.bitstream_name
def set_bitstream_name(self, bitstream_name):
self.bitstream_name = bitstream_name
self.blocks_file_sink_0.open(self.bitstream_name)
def argument_parser():
description = 'A METEOR LRPT demodulation block'
parser = OptionParser(usage="%prog: [options]", option_class=eng_option, description=description)
parser.add_option(
"", "--antenna", dest="antenna", type="string", default=satnogs.not_set_antenna,
help="Set antenna [default=%default]")
parser.add_option(
"", "--bb-gain", dest="bb_gain", type="eng_float", default=eng_notation.num_to_str(satnogs.not_set_rx_bb_gain),
help="Set bb_gain [default=%default]")
parser.add_option(
"", "--decoded-data-file-path", dest="decoded_data_file_path", type="string", default='/tmp/.satnogs/data/data',
help="Set decoded_data_file_path [default=%default]")
parser.add_option(
"", "--dev-args", dest="dev_args", type="string", default=satnogs.not_set_dev_args,
help="Set dev_args [default=%default]")
parser.add_option(
"", "--doppler-correction-per-sec", dest="doppler_correction_per_sec", type="intx", default=1000,
help="Set doppler_correction_per_sec [default=%default]")
parser.add_option(
"", "--enable-iq-dump", dest="enable_iq_dump", type="intx", default=0,
help="Set enable_iq_dump [default=%default]")
parser.add_option(
"", "--file-path", dest="file_path", type="string", default='test.wav',
help="Set file_path [default=%default]")
parser.add_option(
"", "--if-gain", dest="if_gain", type="eng_float", default=eng_notation.num_to_str(satnogs.not_set_rx_if_gain),
help="Set if_gain [default=%default]")
parser.add_option(
"", "--iq-file-path", dest="iq_file_path", type="string", default='/data/satnogs/tmp/iq.dat',
help="Set iq_file_path [default=%default]")
parser.add_option(
"", "--lo-offset", dest="lo_offset", type="eng_float", default=eng_notation.num_to_str(100e3),
help="Set lo_offset [default=%default]")
parser.add_option(
"", "--ppm", dest="ppm", type="intx", default=0,
help="Set ppm [default=%default]")
parser.add_option(
"", "--rf-gain", dest="rf_gain", type="eng_float", default=eng_notation.num_to_str(satnogs.not_set_rx_rf_gain),
help="Set rf_gain [default=%default]")
parser.add_option(
"", "--rigctl-port", dest="rigctl_port", type="intx", default=4532,
help="Set rigctl_port [default=%default]")
parser.add_option(
"", "--rx-freq", dest="rx_freq", type="eng_float", default=eng_notation.num_to_str(100e6),
help="Set rx_freq [default=%default]")
parser.add_option(
"", "--rx-sdr-device", dest="rx_sdr_device", type="string", default='rtlsdr',
help="Set rx_sdr_device [default=%default]")
parser.add_option(
"", "--waterfall-file-path", dest="waterfall_file_path", type="string", default='/tmp/waterfall.dat',
help="Set waterfall_file_path [default=%default]")
return parser
def main(top_block_cls=satnogs_lrpt_demod, options=None):
if options is None:
options, _ = argument_parser().parse_args()
tb = top_block_cls(antenna=options.antenna, bb_gain=options.bb_gain, decoded_data_file_path=options.decoded_data_file_path, dev_args=options.dev_args, doppler_correction_per_sec=options.doppler_correction_per_sec, enable_iq_dump=options.enable_iq_dump, file_path=options.file_path, if_gain=options.if_gain, iq_file_path=options.iq_file_path, lo_offset=options.lo_offset, ppm=options.ppm, rf_gain=options.rf_gain, rigctl_port=options.rigctl_port, rx_freq=options.rx_freq, rx_sdr_device=options.rx_sdr_device, waterfall_file_path=options.waterfall_file_path)
tb.start()
tb.wait()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.