-
-
Save benelsen/6f67540929b9ddae63f35c6953b54abb to your computer and use it in GitHub Desktop.
SatNOGS Station Meteor-M N2-2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
echo "Running postobs.sh" | |
obs_id=$1 | |
datetime=$2 | |
sat_id=$3 | |
freq=$4 | |
baud=$5 | |
script=$6 | |
echo "$datetime: obs: $obs_id, sat: $sat_id, freq: $freq, baud: $baud, script: $script" | tee -a /data/satnogs/postobs.log | |
# /home/pi/rtl_biast/build/src/rtl_biast -d 0 -b 0 | |
if [[ $script == "satnogs_lrpt_demod.py" ]]; then | |
echo "Post-processing Meteor" | |
ionice -c 2 -n 5 nice -n 5 /usr/bin/python /data/satnogs/process_meteor.py $datetime $obs_id $sat_id >> /data/satnogs/process_meteor.log 2>&1 & | |
fi | |
if [[ $script =~ "lrpt" ]] || [[ $script =~ "apt" ]]; then | |
ntfy -c /etc/ntfy/ntfy.yml -t "SatNOGS" -o expire 60 send "$obs_id (Satellite: $sat_id) finished. $script" & | |
fi | |
echo "postobs.sh done" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from glob import glob | |
import os | |
import shutil | |
from time import sleep | |
import subprocess | |
import sys | |
import ntfy | |
from ntfy.config import load_config | |
QUIET=False | |
DEMOD_BIN="/home/pi/meteor_demod/src/meteor_demod" | |
DECODE_BIN="/home/pi/meteor_decode/src/meteor_decode" | |
MEDET_BIN="/home/pi/medet/medet" | |
MOGRIFY_BIN="/usr/bin/mogrify" | |
CONVERT_BIN="/usr/bin/convert" | |
ntfy_config = load_config("/data/satnogs/ntfy.yml") | |
def demodulate(iq_file, s_file, symbol_rate, modulation): | |
if modulation == 'oqpsk': | |
pll_bw = '250' | |
else: | |
pll_bw = '75' | |
dem_cmd = [DEMOD_BIN] | |
if QUIET: | |
dem_cmd.extend(['-q']) | |
dem_cmd.append('-B') | |
dem_cmd.extend(['-R', '1000']) | |
dem_cmd.extend(['-d', '1000']) | |
dem_cmd.extend(['-f', '24']) | |
dem_cmd.extend(['-b', pll_bw]) | |
# dem_cmd.extend(['-a', '0.6']) | |
dem_cmd.extend(['-s', '156250']) | |
dem_cmd.extend(['-r', symbol_rate]) | |
dem_cmd.extend(['-m', modulation]) | |
dem_cmd.extend(['-o', s_file]) | |
dem_cmd.append(iq_file) | |
print(dem_cmd) | |
with open(os.path.dirname(s_file) + "/" + 'demodulate.log', 'w') as f_out: | |
dem_return_code = subprocess.call(dem_cmd, stdout = f_out) | |
return dem_return_code | |
def decode(s_file, output_file, apids, diff, interleaved, split): | |
dec_cmd = [DECODE_BIN] | |
dec_cmd.extend(['-a', ",".join(apids)]) | |
if diff: | |
dec_cmd.append('-d') | |
if split: | |
dec_cmd.append('-S') | |
else: | |
output_file = output_file + ".png" | |
dec_cmd.append('-q') | |
dec_cmd.append('-s') | |
dec_cmd.extend(['-o', output_file]) | |
dec_cmd.append(s_file) | |
print(dec_cmd) | |
with open(os.path.dirname(output_file) + "/" + 'decode.log', 'w') as f_out: | |
dec_return_code = subprocess.call(dec_cmd, stdout = f_out) | |
return dec_return_code | |
def decode_medet(s_file, output_file, apids, diff, interleaved, split): | |
dec_cmd = [MEDET_BIN] | |
dec_cmd.append(s_file) | |
dec_cmd.append(output_file) | |
dec_cmd.extend(['-r', apids[0]]) | |
dec_cmd.extend(['-g', apids[1]]) | |
dec_cmd.extend(['-b', apids[2]]) | |
if interleaved: | |
dec_cmd.append('-int') | |
if diff: | |
dec_cmd.append('-diff') | |
if split: | |
dec_cmd.append('-S') | |
dec_cmd.append('-t') | |
dec_cmd.append('-q') | |
print(dec_cmd) | |
with open(os.path.dirname(output_file) + '/' + 'medet.log', 'w') as f_out: | |
dec_return_code = subprocess.call(dec_cmd, stdout = f_out) | |
return dec_return_code | |
def dem_dec(iq_file, s_file, output_file, sat_id = "00000"): | |
if str(sat_id) == "44387": | |
apids = map(str, [68,65,64]) | |
if iq_file == None: | |
return False | |
# demodulating | |
ret_dem = demodulate(iq_file, s_file, '72000', 'oqpsk') | |
# decoding | |
#ret_dec = decode(s_file, output_file, True, ",".join(apids), False) | |
ret_dec = decode_medet(s_file, output_file, apids, True, False, False) | |
elif str(sat_id) == "40069": | |
apids = map(str, [68,65,64]) | |
# try 72k qpsk | |
# s file should already be demodulated | |
if not os.path.isfile(s_file) or os.path.getsize(s_file) < 1000: | |
if iq_file == None: | |
return False | |
ret_dem = demodulate(iq_file, s_file, '72000', 'qpsk') | |
# decoding | |
#ret_dec = decode(s_file, output_file, False, ",".join(apids), False) | |
ret_dec = decode_medet(s_file, output_file, apids, False, False, False) | |
else: | |
return False | |
if ret_dec != 0: | |
return False | |
if apids == ['66', '65', '64']: | |
if os.path.isfile(output_file + ".bmp") and os.path.getsize(output_file + ".bmp") > 1000: | |
convert_cmd = [CONVERT_BIN, output_file + ".bmp", output_file + "_666564.pmg"] | |
with open(os.path.dirname(output_file) + '/' + 'convert.log', 'w') as log_out: | |
convert_return_code = subprocess.call(convert_cmd, stdout = log_out) | |
if convert_return_code == 0 and os.path.isfile(output_file + "_666564.png") and os.path.getsize(output_file + "_666564.png") > 1000: | |
os.remove(output_file + ".bmp") | |
return True | |
elif apids == ['68', '65', '64']: | |
if os.path.isfile(output_file + ".bmp") and os.path.getsize(output_file + ".bmp") > 1000: | |
convert_cmd_1 = [CONVERT_BIN] | |
convert_cmd_1.extend([output_file + ".bmp", "-channel", "G", "-separate"]) | |
convert_cmd_1.extend([output_file + ".bmp", "-channel", "G", "-separate"]) | |
convert_cmd_1.extend([output_file + ".bmp", "-channel", "B", "-separate"]) | |
convert_cmd_1.extend(["+channel", "-combine", output_file + "_656564.png"]) | |
convert_cmd_2 = [CONVERT_BIN] | |
convert_cmd_2.extend([output_file + ".bmp"]) | |
convert_cmd_2.extend(["-channel", "R", "-separate", "-colorspace", "Gray", "+channel", "-negate", "-auto-level"]) | |
convert_cmd_2.extend([output_file + "_68.png"]) | |
with open(os.path.dirname(output_file) + '/' + 'convert.log', 'w') as log_out: | |
convert_return_code_1 = subprocess.call(convert_cmd_1, stdout = log_out) | |
convert_return_code_2 = subprocess.call(convert_cmd_2, stdout = log_out) | |
if convert_return_code_1 == 0 and convert_return_code_2 == 0 and \ | |
os.path.isfile(output_file + "_656564.png") and os.path.getsize(output_file + "_656564.png") > 1000 and \ | |
os.path.isfile(output_file + "_68.png") and os.path.getsize(output_file + "_68.png") > 1000: | |
os.remove(output_file + ".bmp") | |
return True | |
elif os.path.isfile(output_file + ".bmp"): | |
os.remove(output_file + ".bmp") | |
return False | |
# # currently sending 80k oqpsk | |
# ret_dem = demodulate(iq_file, s_file, '80000', 'oqpsk') | |
# ret_dec = decode_medet(s_file, output_file, apids, True, True, True) | |
# if os.path.isfile(output_file + ".bmp"): | |
# shutil.move(output_file + ".bmp", output_file + "_" + "_".join(apids) + ".bmp") | |
# for (i, apid) in zip(range(0, len(apids)), apids): | |
# shutil.move(output_file + "_" + str(i) + ".bmp", output_file + "_apid_" + apid + ".bmp") | |
# if ret_dec == 0 and (os.path.isfile(output_file + "_" + "_".join(apids) + ".bmp") and os.path.getsize(output_file + "_" + "_".join(apids) + ".bmp") > 1000): | |
# convert_cmd = [MOGRIFY_BIN] | |
# convert_cmd.extend(['-format', 'png']) | |
# convert_cmd.append(output_file + "*.bmp") | |
# with open(os.path.dirname(output_file) + '/' + 'convert.log', 'w') as log_out: | |
# convert_return_code = subprocess.call(convert_cmd, stdout = log_out) | |
# if convert_return_code == 0 and os.path.isfile(output_file + "_" + "_".join(apids) + ".png") and os.path.getsize(output_file + "_" + "_".join(apids) + ".png") > 1000: | |
# for f in glob(output_file + "*.bmp"): | |
# os.remove(f) | |
# return True | |
# # try 72k qpsk | |
# # s file should already be demodulated | |
# if not os.path.isfile(s_file) or os.path.getsize(s_file) < 1000: | |
# ret_dem = demodulate(iq_file, s_file, '72000', 'qpsk') | |
# ret_dec = decode(s_file, output_file, False, '66,65,64', False) | |
# | |
# # try 72k oqpsk | |
# ret_dem = demodulate(iq_file, s_file, '72000', 'oqpsk') | |
# ret_dec = decode(s_file, output_file, False, '69,68,67', True) | |
# | |
# # try 80k qpsk interleaved | |
# ret_dem = demodulate(iq_file, s_file, '80000', 'qpsk') | |
# ret_dec = decode(s_file, output_file, True, '66,65,64', False) | |
def main(): | |
timestamp = sys.argv[1] | |
obs_id = sys.argv[2] | |
sat_id = sys.argv[3] | |
print(timestamp + " | obs: " + obs_id + " sat_id: " + sat_id) | |
DEST_DIR="/data/satnogs/complete/" | |
SATNOGS_DATA_DIR="/tmp/.satnogs/data/" | |
basename = "data_" + obs_id + "_" + timestamp | |
iq_file = basename + ".dat" | |
iq_path = DEST_DIR + iq_file | |
SRC_IQ_PATH = "/data/satnogs/tmp/iq.dat" | |
if os.path.isfile(iq_path) and os.path.getsize(iq_path) > 0: | |
# don't copy | |
# todo: handle edge cases | |
pass | |
elif os.path.isfile(SRC_IQ_PATH) and os.path.getsize(SRC_IQ_PATH) > 0: | |
# copy | |
shutil.move(SRC_IQ_PATH, iq_path) | |
pass | |
else: | |
iq_file = None | |
iq_path = None | |
# print("undefined behaviour") | |
# sys.exit("not sure what to do in this case") | |
s_file = basename + ".s" | |
s_path = DEST_DIR + s_file | |
src_s_path = "/data/satnogs/tmp/" + s_file | |
if os.path.isfile(s_path) and os.path.getsize(s_path) > 0: | |
pass | |
else: | |
if not os.path.isfile(src_s_path): | |
src_s_glob = "/data/satnogs/tmp/" + "data_" + obs_id + "*.s" | |
src_s_paths = glob(src_s_glob) | |
if len(src_s_paths) == 1: | |
src_s_path = src_s_paths[0] | |
elif len(src_s_paths) > 1: | |
print("More than 1 file matched the glob. Not sure what to do.") | |
if os.path.isfile(src_s_path) and os.path.getsize(src_s_path) > 0: | |
shutil.move(src_s_path, s_path) | |
else: | |
print("no .s file, will demodulate .dat") | |
output_file = DEST_DIR + basename | |
output_glob = DEST_DIR + basename + "*.png" | |
print(iq_path, s_path, src_s_path) | |
result = dem_dec(iq_path, s_path, output_file, sat_id) | |
if result: | |
for f in glob(output_glob): | |
print("output file:", f) | |
shutil.copy(f, SATNOGS_DATA_DIR) | |
else: | |
print("Demod or decode unsuccessful") | |
# for f in glob(DEST_DIR + basename + "*.png"): | |
# os.remove(f) | |
# for f in glob(DEST_DIR + basename + "*.stat"): | |
# os.remove(f) | |
# for f in glob(DEST_DIR + basename + "*.bmp"): | |
# os.remove(f) | |
if result: | |
for f in glob(DEST_DIR + basename + "*.s"): | |
os.remove(f) | |
if result: | |
ntfy.notify(timestamp + " obs: " + obs_id + " sat_id: " + sat_id, "SatNOGS obs successful", ntfy_config, url = "https://network.satnogs.org/observations/" + obs_id + "/") | |
sys.exit() | |
else: | |
ntfy.notify(timestamp + " obs: " + obs_id + " sat_id: " + sat_id, "SatNOGS obs failed", ntfy_config, url = "https://network.satnogs.org/observations/" + obs_id + "/") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
# -*- coding: utf-8 -*- | |
################################################## | |
# GNU Radio Python Flow Graph | |
# Title: LRPT Demodulator | |
# Author: Mark Jessop (vk5qi@rfhead.net) | |
# Description: A METEOR LRPT demodulation block | |
# Generated: Sat Aug 10 18:17:13 2019 | |
################################################## | |
from datetime import datetime | |
from gnuradio import analog | |
from gnuradio import blocks | |
from gnuradio import digital | |
from gnuradio import eng_notation | |
from gnuradio import filter | |
from gnuradio import gr | |
from gnuradio.eng_option import eng_option | |
from gnuradio.filter import firdes | |
from optparse import OptionParser | |
import os.path | |
import osmosdr | |
import satnogs | |
import time | |
class satnogs_lrpt_demod(gr.top_block): | |
def __init__(self, antenna=satnogs.not_set_antenna, bb_gain=satnogs.not_set_rx_bb_gain, decoded_data_file_path='/tmp/.satnogs/data/data', dev_args=satnogs.not_set_dev_args, doppler_correction_per_sec=1000, enable_iq_dump=0, file_path='test.wav', if_gain=satnogs.not_set_rx_if_gain, iq_file_path='/data/satnogs/tmp/iq.dat', lo_offset=100e3, ppm=0, rf_gain=satnogs.not_set_rx_rf_gain, rigctl_port=4532, rx_freq=100e6, rx_sdr_device='rtlsdr', waterfall_file_path='/tmp/waterfall.dat'): | |
gr.top_block.__init__(self, "LRPT Demodulator") | |
################################################## | |
# Parameters | |
################################################## | |
self.antenna = antenna | |
self.bb_gain = bb_gain | |
self.decoded_data_file_path = decoded_data_file_path | |
self.dev_args = dev_args | |
self.doppler_correction_per_sec = doppler_correction_per_sec | |
self.enable_iq_dump = enable_iq_dump = 1 | |
self.file_path = file_path | |
self.if_gain = if_gain | |
self.iq_file_path = iq_file_path | |
self.lo_offset = lo_offset | |
self.ppm = ppm | |
self.rf_gain = rf_gain | |
self.rigctl_port = rigctl_port | |
self.rx_freq = rx_freq | |
self.rx_sdr_device = rx_sdr_device | |
self.waterfall_file_path = waterfall_file_path | |
################################################## | |
# Variables | |
################################################## | |
self.symb_rate = symb_rate = 72000 | |
self.samp_rate_rx = samp_rate_rx = satnogs.hw_rx_settings[rx_sdr_device]['samp_rate'] | |
self.quadrature_rate = quadrature_rate = 156250 | |
self.xlate_filter_taps = xlate_filter_taps = firdes.low_pass(1, samp_rate_rx, 125000, 25000, firdes.WIN_HAMMING, 6.76) | |
self.taps = taps = firdes.low_pass(12.0, samp_rate_rx, 100e3, 60000, firdes.WIN_HAMMING, 6.76) | |
self.sps = sps = (quadrature_rate*1.0)/(symb_rate*1.0) | |
self.pll_alpha = pll_alpha = 1e-3 | |
self.filter_rate = filter_rate = 250000 | |
self.deviation = deviation = 17000 | |
self.clock_alpha = clock_alpha = 2e-3 | |
self.bitstream_name = bitstream_name = "/data/satnogs/tmp/" + os.path.basename(decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s" | |
################################################## | |
# Blocks | |
################################################## | |
self.satnogs_waterfall_sink_0 = satnogs.waterfall_sink(quadrature_rate, rx_freq, 8, 1024, waterfall_file_path, 1) | |
self.satnogs_tcp_rigctl_msg_source_0 = satnogs.tcp_rigctl_msg_source("127.0.0.1", rigctl_port, False, 1000, 1500) | |
self.satnogs_iq_sink_0 = satnogs.iq_sink(16768, iq_file_path, False, enable_iq_dump) | |
self.satnogs_coarse_doppler_correction_cc_0 = satnogs.coarse_doppler_correction_cc(rx_freq, samp_rate_rx) | |
self.root_raised_cosine_filter_0 = filter.fir_filter_ccf(1, firdes.root_raised_cosine( | |
1, quadrature_rate, symb_rate, 0.6, 361)) | |
self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + satnogs.handle_rx_dev_args(rx_sdr_device, dev_args) ) | |
self.osmosdr_source_0.set_sample_rate(samp_rate_rx) | |
self.osmosdr_source_0.set_center_freq(rx_freq - lo_offset, 0) | |
self.osmosdr_source_0.set_freq_corr(ppm, 0) | |
self.osmosdr_source_0.set_dc_offset_mode(2, 0) | |
self.osmosdr_source_0.set_iq_balance_mode(0, 0) | |
self.osmosdr_source_0.set_gain_mode(False, 0) | |
self.osmosdr_source_0.set_gain(satnogs.handle_rx_rf_gain(rx_sdr_device, rf_gain), 0) | |
self.osmosdr_source_0.set_if_gain(satnogs.handle_rx_if_gain(rx_sdr_device, if_gain), 0) | |
self.osmosdr_source_0.set_bb_gain(satnogs.handle_rx_bb_gain(rx_sdr_device, bb_gain), 0) | |
self.osmosdr_source_0.set_antenna(satnogs.handle_rx_antenna(rx_sdr_device, antenna), 0) | |
self.osmosdr_source_0.set_bandwidth(samp_rate_rx, 0) | |
self.freq_xlating_fir_filter_xxx_0 = filter.freq_xlating_fir_filter_ccc(int(samp_rate_rx/filter_rate), (xlate_filter_taps), lo_offset, samp_rate_rx) | |
self.digital_costas_loop_cc_0 = digital.costas_loop_cc(pll_alpha, 4, False) | |
self.digital_constellation_soft_decoder_cf_1 = digital.constellation_soft_decoder_cf(digital.constellation_calcdist(([-1-1j, -1+1j, 1+1j, 1-1j]), ([0, 1, 3, 2]), 4, 1).base()) | |
self.digital_clock_recovery_mm_xx_0 = digital.clock_recovery_mm_cc(sps, clock_alpha**2/4.0, 0.5, clock_alpha, 0.005) | |
self.blocks_float_to_char_0 = blocks.float_to_char(1, 127) | |
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_char*1, bitstream_name, False) | |
self.blocks_file_sink_0.set_unbuffered(False) | |
self.blks2_rational_resampler_xxx_1 = filter.rational_resampler_ccc( | |
interpolation=10, | |
decimation=16, | |
taps=None, | |
fractional_bw=None, | |
) | |
self.analog_rail_ff_0 = analog.rail_ff(-1, 1) | |
self.analog_agc_xx_0 = analog.agc_cc(1000e-4, 0.5, 1.0) | |
self.analog_agc_xx_0.set_max_gain(4000) | |
################################################## | |
# Connections | |
################################################## | |
self.msg_connect((self.satnogs_tcp_rigctl_msg_source_0, 'freq'), (self.satnogs_coarse_doppler_correction_cc_0, 'freq')) | |
self.connect((self.analog_agc_xx_0, 0), (self.root_raised_cosine_filter_0, 0)) | |
self.connect((self.analog_rail_ff_0, 0), (self.blocks_float_to_char_0, 0)) | |
self.connect((self.blks2_rational_resampler_xxx_1, 0), (self.analog_agc_xx_0, 0)) | |
self.connect((self.blks2_rational_resampler_xxx_1, 0), (self.satnogs_iq_sink_0, 0)) | |
self.connect((self.blks2_rational_resampler_xxx_1, 0), (self.satnogs_waterfall_sink_0, 0)) | |
self.connect((self.blocks_float_to_char_0, 0), (self.blocks_file_sink_0, 0)) | |
self.connect((self.digital_clock_recovery_mm_xx_0, 0), (self.digital_constellation_soft_decoder_cf_1, 0)) | |
self.connect((self.digital_constellation_soft_decoder_cf_1, 0), (self.analog_rail_ff_0, 0)) | |
self.connect((self.digital_costas_loop_cc_0, 0), (self.digital_clock_recovery_mm_xx_0, 0)) | |
self.connect((self.freq_xlating_fir_filter_xxx_0, 0), (self.blks2_rational_resampler_xxx_1, 0)) | |
self.connect((self.osmosdr_source_0, 0), (self.satnogs_coarse_doppler_correction_cc_0, 0)) | |
self.connect((self.root_raised_cosine_filter_0, 0), (self.digital_costas_loop_cc_0, 0)) | |
self.connect((self.satnogs_coarse_doppler_correction_cc_0, 0), (self.freq_xlating_fir_filter_xxx_0, 0)) | |
def get_antenna(self): | |
return self.antenna | |
def set_antenna(self, antenna): | |
self.antenna = antenna | |
self.osmosdr_source_0.set_antenna(satnogs.handle_rx_antenna(self.rx_sdr_device, self.antenna), 0) | |
def get_bb_gain(self): | |
return self.bb_gain | |
def set_bb_gain(self, bb_gain): | |
self.bb_gain = bb_gain | |
self.osmosdr_source_0.set_bb_gain(satnogs.handle_rx_bb_gain(self.rx_sdr_device, self.bb_gain), 0) | |
def get_decoded_data_file_path(self): | |
return self.decoded_data_file_path | |
def set_decoded_data_file_path(self, decoded_data_file_path): | |
self.decoded_data_file_path = decoded_data_file_path | |
self.set_bitstream_name("/data/satnogs/tmp/" + os.path.basename(self.decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s") | |
def get_dev_args(self): | |
return self.dev_args | |
def set_dev_args(self, dev_args): | |
self.dev_args = dev_args | |
def get_doppler_correction_per_sec(self): | |
return self.doppler_correction_per_sec | |
def set_doppler_correction_per_sec(self, doppler_correction_per_sec): | |
self.doppler_correction_per_sec = doppler_correction_per_sec | |
def get_enable_iq_dump(self): | |
return self.enable_iq_dump | |
def set_enable_iq_dump(self, enable_iq_dump): | |
self.enable_iq_dump = enable_iq_dump | |
def get_file_path(self): | |
return self.file_path | |
def set_file_path(self, file_path): | |
self.file_path = file_path | |
def get_if_gain(self): | |
return self.if_gain | |
def set_if_gain(self, if_gain): | |
self.if_gain = if_gain | |
self.osmosdr_source_0.set_if_gain(satnogs.handle_rx_if_gain(self.rx_sdr_device, self.if_gain), 0) | |
def get_iq_file_path(self): | |
return self.iq_file_path | |
def set_iq_file_path(self, iq_file_path): | |
self.iq_file_path = iq_file_path | |
def get_lo_offset(self): | |
return self.lo_offset | |
def set_lo_offset(self, lo_offset): | |
self.lo_offset = lo_offset | |
self.osmosdr_source_0.set_center_freq(self.rx_freq - self.lo_offset, 0) | |
self.freq_xlating_fir_filter_xxx_0.set_center_freq(self.lo_offset) | |
def get_ppm(self): | |
return self.ppm | |
def set_ppm(self, ppm): | |
self.ppm = ppm | |
self.osmosdr_source_0.set_freq_corr(self.ppm, 0) | |
def get_rf_gain(self): | |
return self.rf_gain | |
def set_rf_gain(self, rf_gain): | |
self.rf_gain = rf_gain | |
self.osmosdr_source_0.set_gain(satnogs.handle_rx_rf_gain(self.rx_sdr_device, self.rf_gain), 0) | |
def get_rigctl_port(self): | |
return self.rigctl_port | |
def set_rigctl_port(self, rigctl_port): | |
self.rigctl_port = rigctl_port | |
def get_rx_freq(self): | |
return self.rx_freq | |
def set_rx_freq(self, rx_freq): | |
self.rx_freq = rx_freq | |
self.satnogs_coarse_doppler_correction_cc_0.set_new_freq_locked(self.rx_freq) | |
self.osmosdr_source_0.set_center_freq(self.rx_freq - self.lo_offset, 0) | |
def get_rx_sdr_device(self): | |
return self.rx_sdr_device | |
def set_rx_sdr_device(self, rx_sdr_device): | |
self.rx_sdr_device = rx_sdr_device | |
self.set_samp_rate_rx(satnogs.hw_rx_settings[self.rx_sdr_device]['samp_rate']) | |
self.osmosdr_source_0.set_gain(satnogs.handle_rx_rf_gain(self.rx_sdr_device, self.rf_gain), 0) | |
self.osmosdr_source_0.set_if_gain(satnogs.handle_rx_if_gain(self.rx_sdr_device, self.if_gain), 0) | |
self.osmosdr_source_0.set_bb_gain(satnogs.handle_rx_bb_gain(self.rx_sdr_device, self.bb_gain), 0) | |
self.osmosdr_source_0.set_antenna(satnogs.handle_rx_antenna(self.rx_sdr_device, self.antenna), 0) | |
def get_waterfall_file_path(self): | |
return self.waterfall_file_path | |
def set_waterfall_file_path(self, waterfall_file_path): | |
self.waterfall_file_path = waterfall_file_path | |
def get_symb_rate(self): | |
return self.symb_rate | |
def set_symb_rate(self, symb_rate): | |
self.symb_rate = symb_rate | |
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0)) | |
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361)) | |
def get_samp_rate_rx(self): | |
return self.samp_rate_rx | |
def set_samp_rate_rx(self, samp_rate_rx): | |
self.samp_rate_rx = samp_rate_rx | |
self.set_xlate_filter_taps(firdes.low_pass(1, self.samp_rate_rx, 125000, 25000, firdes.WIN_HAMMING, 6.76)) | |
self.osmosdr_source_0.set_sample_rate(self.samp_rate_rx) | |
self.osmosdr_source_0.set_bandwidth(self.samp_rate_rx, 0) | |
def get_quadrature_rate(self): | |
return self.quadrature_rate | |
def set_quadrature_rate(self, quadrature_rate): | |
self.quadrature_rate = quadrature_rate | |
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0)) | |
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361)) | |
def get_xlate_filter_taps(self): | |
return self.xlate_filter_taps | |
def set_xlate_filter_taps(self, xlate_filter_taps): | |
self.xlate_filter_taps = xlate_filter_taps | |
self.freq_xlating_fir_filter_xxx_0.set_taps((self.xlate_filter_taps)) | |
def get_taps(self): | |
return self.taps | |
def set_taps(self, taps): | |
self.taps = taps | |
def get_sps(self): | |
return self.sps | |
def set_sps(self, sps): | |
self.sps = sps | |
self.digital_clock_recovery_mm_xx_0.set_omega(self.sps) | |
def get_pll_alpha(self): | |
return self.pll_alpha | |
def set_pll_alpha(self, pll_alpha): | |
self.pll_alpha = pll_alpha | |
self.digital_costas_loop_cc_0.set_loop_bandwidth(self.pll_alpha) | |
def get_filter_rate(self): | |
return self.filter_rate | |
def set_filter_rate(self, filter_rate): | |
self.filter_rate = filter_rate | |
def get_deviation(self): | |
return self.deviation | |
def set_deviation(self, deviation): | |
self.deviation = deviation | |
def get_clock_alpha(self): | |
return self.clock_alpha | |
def set_clock_alpha(self, clock_alpha): | |
self.clock_alpha = clock_alpha | |
self.digital_clock_recovery_mm_xx_0.set_gain_omega(self.clock_alpha**2/4.0) | |
self.digital_clock_recovery_mm_xx_0.set_gain_mu(self.clock_alpha) | |
def get_bitstream_name(self): | |
return self.bitstream_name | |
def set_bitstream_name(self, bitstream_name): | |
self.bitstream_name = bitstream_name | |
self.blocks_file_sink_0.open(self.bitstream_name) | |
def argument_parser(): | |
description = 'A METEOR LRPT demodulation block' | |
parser = OptionParser(usage="%prog: [options]", option_class=eng_option, description=description) | |
parser.add_option( | |
"", "--antenna", dest="antenna", type="string", default=satnogs.not_set_antenna, | |
help="Set antenna [default=%default]") | |
parser.add_option( | |
"", "--bb-gain", dest="bb_gain", type="eng_float", default=eng_notation.num_to_str(satnogs.not_set_rx_bb_gain), | |
help="Set bb_gain [default=%default]") | |
parser.add_option( | |
"", "--decoded-data-file-path", dest="decoded_data_file_path", type="string", default='/tmp/.satnogs/data/data', | |
help="Set decoded_data_file_path [default=%default]") | |
parser.add_option( | |
"", "--dev-args", dest="dev_args", type="string", default=satnogs.not_set_dev_args, | |
help="Set dev_args [default=%default]") | |
parser.add_option( | |
"", "--doppler-correction-per-sec", dest="doppler_correction_per_sec", type="intx", default=1000, | |
help="Set doppler_correction_per_sec [default=%default]") | |
parser.add_option( | |
"", "--enable-iq-dump", dest="enable_iq_dump", type="intx", default=0, | |
help="Set enable_iq_dump [default=%default]") | |
parser.add_option( | |
"", "--file-path", dest="file_path", type="string", default='test.wav', | |
help="Set file_path [default=%default]") | |
parser.add_option( | |
"", "--if-gain", dest="if_gain", type="eng_float", default=eng_notation.num_to_str(satnogs.not_set_rx_if_gain), | |
help="Set if_gain [default=%default]") | |
parser.add_option( | |
"", "--iq-file-path", dest="iq_file_path", type="string", default='/data/satnogs/tmp/iq.dat', | |
help="Set iq_file_path [default=%default]") | |
parser.add_option( | |
"", "--lo-offset", dest="lo_offset", type="eng_float", default=eng_notation.num_to_str(100e3), | |
help="Set lo_offset [default=%default]") | |
parser.add_option( | |
"", "--ppm", dest="ppm", type="intx", default=0, | |
help="Set ppm [default=%default]") | |
parser.add_option( | |
"", "--rf-gain", dest="rf_gain", type="eng_float", default=eng_notation.num_to_str(satnogs.not_set_rx_rf_gain), | |
help="Set rf_gain [default=%default]") | |
parser.add_option( | |
"", "--rigctl-port", dest="rigctl_port", type="intx", default=4532, | |
help="Set rigctl_port [default=%default]") | |
parser.add_option( | |
"", "--rx-freq", dest="rx_freq", type="eng_float", default=eng_notation.num_to_str(100e6), | |
help="Set rx_freq [default=%default]") | |
parser.add_option( | |
"", "--rx-sdr-device", dest="rx_sdr_device", type="string", default='rtlsdr', | |
help="Set rx_sdr_device [default=%default]") | |
parser.add_option( | |
"", "--waterfall-file-path", dest="waterfall_file_path", type="string", default='/tmp/waterfall.dat', | |
help="Set waterfall_file_path [default=%default]") | |
return parser | |
def main(top_block_cls=satnogs_lrpt_demod, options=None): | |
if options is None: | |
options, _ = argument_parser().parse_args() | |
tb = top_block_cls(antenna=options.antenna, bb_gain=options.bb_gain, decoded_data_file_path=options.decoded_data_file_path, dev_args=options.dev_args, doppler_correction_per_sec=options.doppler_correction_per_sec, enable_iq_dump=options.enable_iq_dump, file_path=options.file_path, if_gain=options.if_gain, iq_file_path=options.iq_file_path, lo_offset=options.lo_offset, ppm=options.ppm, rf_gain=options.rf_gain, rigctl_port=options.rigctl_port, rx_freq=options.rx_freq, rx_sdr_device=options.rx_sdr_device, waterfall_file_path=options.waterfall_file_path) | |
tb.start() | |
tb.wait() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment