Last active
September 17, 2022 06:07
-
-
Save jh4xsy/c9edff891530ca59acefbe21bfad70a8 to your computer and use it in GitHub Desktop.
satnogsでlrptをデコードするスクリプト群
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import signal | |
import subprocess | |
import satnogsclient.settings as client_settings | |
LOGGER = logging.getLogger(__name__) | |
SATNOGS_FLOWGRAPH_SCRIPTS = { | |
'AFSK1K2': 'satnogs_afsk1200_ax25.py', | |
'AMSAT_DUV': 'satnogs_amsat_fox_duv_decoder.py', | |
'APT': 'satnogs_noaa_apt_decoder.py', | |
'ARGOS_BPSK_PMT_A3': 'satnogs_argos_bpsk_ldr.py', | |
'BPSK': 'satnogs_bpsk.py', | |
'CW': 'satnogs_cw_decoder.py', | |
'FM': 'satnogs_fm.py', | |
'FSK': 'satnogs_fsk.py', | |
'GFSK_RKTR': 'satnogs_reaktor_hello_world_fsk9600_decoder.py', | |
'GFSK/BPSK': 'satnogs_qubik_telem.py', | |
'SSTV': 'satnogs_sstv_pd120_demod.py', | |
'LRPT': 'satnogs_lrpt_demod.py' | |
} | |
SATNOGS_FLOWGRAPH_MODES = { | |
'AFSK': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['AFSK1K2'], | |
'has_baudrate': False, | |
'has_framing': False | |
}, | |
'APT': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['APT'], | |
'has_baudrate': False, | |
'has_framing': False, | |
}, | |
'BPSK': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['BPSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax25' | |
}, | |
'BPSK PMT-A3': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['ARGOS_BPSK_PMT_A3'], | |
'has_baudrate': True, | |
'has_framing': False | |
}, | |
'CW': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['CW'], | |
'has_baudrate': True, | |
'has_framing': False | |
}, | |
'DUV': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['AMSAT_DUV'], | |
'has_baudrate': False, | |
'has_framing': False | |
}, | |
'FM': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FM'], | |
'has_baudrate': False, | |
'has_framing': False | |
}, | |
'FSK': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax25' | |
}, | |
'FSK AX.100 Mode 5': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax100_mode5' | |
}, | |
'FSK AX.100 Mode 6': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax100_mode6' | |
}, | |
'GFSK': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax25' | |
}, | |
'GFSK Rktr': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['GFSK_RKTR'], | |
'has_baudrate': True, | |
'has_framing': False | |
}, | |
'GFSK/BPSK': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['GFSK/BPSK'], | |
'has_baudrate': True, | |
'has_framing': False, | |
}, | |
'GMSK': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax25' | |
}, | |
'MSK': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax25' | |
}, | |
'MSK AX.100 Mode 5': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax100_mode5' | |
}, | |
'MSK AX.100 Mode 6': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'], | |
'has_baudrate': True, | |
'has_framing': True, | |
'framing': 'ax100_mode6' | |
}, | |
'SSTV': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['SSTV'], | |
'has_baudrate': False, | |
'has_framing': False | |
}, | |
'LRPT': { | |
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['LRPT'], | |
'has_baudrate': False, | |
'has_framing': False | |
} | |
} | |
SATNOGS_FLOWGRAPH_MODE_DEFAULT = 'FM' | |
class Flowgraph(): | |
""" | |
Execute SatNOGS Flowgraphs | |
:param device: SoapySDR device | |
:type device: str | |
:param sampling_rate: Sampling rate | |
:type sampling_rate: int | |
:param frequency: RX frequency | |
:type frequency: int | |
:param mode: Mode of operation | |
:type mode: str | |
:param baud: Baud rate or WPM | |
:type baud: int | |
:param output_data: Dictionary of output data | |
:type output_data: dict | |
""" | |
def __init__(self, device, sampling_rate, frequency, mode, baud, output_data): | |
""" | |
Class constructor | |
""" | |
self.parameters = { | |
'soapy-rx-device': device, | |
'samp-rate-rx': str(sampling_rate), | |
'rx-freq': str(frequency), | |
'file-path': output_data['audio'], | |
'waterfall-file-path': output_data['waterfall'], | |
'decoded-data-file-path': output_data['decoded'], | |
'doppler-correction-per-sec': client_settings.SATNOGS_DOPPLER_CORR_PER_SEC, | |
'lo-offset': client_settings.SATNOGS_LO_OFFSET, | |
'ppm': client_settings.SATNOGS_PPM_ERROR, | |
'rigctl-port': str(client_settings.SATNOGS_RIG_PORT), | |
'gain-mode': client_settings.SATNOGS_GAIN_MODE, | |
'gain': client_settings.SATNOGS_RF_GAIN, | |
'antenna': client_settings.SATNOGS_ANTENNA, | |
'dev-args': client_settings.SATNOGS_DEV_ARGS, | |
'stream-args': client_settings.SATNOGS_STREAM_ARGS, | |
'tune-args': client_settings.SATNOGS_TUNE_ARGS, | |
'other-settings': client_settings.SATNOGS_OTHER_SETTINGS, | |
'dc-removal': client_settings.SATNOGS_DC_REMOVAL, | |
'bb-freq': client_settings.SATNOGS_BB_FREQ, | |
'bw': client_settings.SATNOGS_RX_BANDWIDTH, | |
'enable-iq-dump': str(int(client_settings.ENABLE_IQ_DUMP is True)), | |
'iq-file-path': client_settings.IQ_DUMP_FILENAME, | |
'udp-dump-host': client_settings.UDP_DUMP_HOST, | |
'udp-dump-port': client_settings.UDP_DUMP_PORT, | |
'wpm': None, | |
'baudrate': None, | |
'framing': None | |
} | |
if mode in SATNOGS_FLOWGRAPH_MODES: | |
self.script = SATNOGS_FLOWGRAPH_MODES[mode]['script_name'] | |
if baud and SATNOGS_FLOWGRAPH_MODES[mode]['has_baudrate']: | |
# If this is a CW observation pass the WPM parameter | |
if mode == 'CW': | |
self.parameters['wpm'] = str(int(baud)) | |
else: | |
self.parameters['baudrate'] = str(int(baud)) | |
# Apply framing mode | |
if SATNOGS_FLOWGRAPH_MODES[mode]['has_framing']: | |
self.parameters['framing'] = SATNOGS_FLOWGRAPH_MODES[mode]['framing'] | |
else: | |
self.script = SATNOGS_FLOWGRAPH_MODES[SATNOGS_FLOWGRAPH_MODE_DEFAULT]['script_name'] | |
self.process = None | |
@property | |
def enabled(self): | |
""" | |
Get flowgraph running status | |
:return: Flowgraph running status | |
:rtype: bool | |
""" | |
if self.process and self.process.poll() is None: | |
return True | |
return False | |
@enabled.setter | |
def enabled(self, status): | |
""" | |
Start or stop running of flowgraph | |
:param status: Running status | |
:type status: bool | |
""" | |
if status: | |
args = [self.script] | |
for parameter, value in self.parameters.items(): | |
if value is not None: | |
args.append('--{}={}'.format(parameter, value)) | |
try: | |
self.process = subprocess.Popen(args) | |
except OSError: | |
LOGGER.exception('Could not start GNURadio python script') | |
else: | |
if self.process: | |
self.process.send_signal(signal.SIGINT) | |
_, _ = self.process.communicate() | |
@property | |
def info(self): | |
""" | |
Get information and parameters of flowgraph and radio | |
:return: Information about flowgraph and radio | |
:rtype: dict | |
""" | |
client_metadata = { | |
'radio': { | |
'name': 'gr-satnogs', | |
'version': None, | |
'parameters': self.parameters | |
} | |
} | |
process = subprocess.Popen(['python3', '-m', 'satnogs.satnogs_info'], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
gr_satnogs_info, _ = process.communicate() # pylint: disable=W0612 | |
if process.returncode == 0: | |
try: | |
gr_satnogs_info = json.loads(gr_satnogs_info) | |
except ValueError: | |
client_metadata['radio']['version'] = 'invalid' | |
else: | |
if 'version' in gr_satnogs_info: | |
client_metadata['radio']['version'] = gr_satnogs_info['version'] | |
else: | |
client_metadata['radio']['version'] = 'unknown' | |
return client_metadata |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
echo "POST OBSERVATION SCRIPT RUNNING" | |
#Maybe you need to do something with the Bias T: | |
#/home/pi/rtl_biast/build/src/rtl_biast -b 0 | |
python /home/jh4xsy/Downloads/satnogs-extras/scripts/process_meteor.py "$1" "$2" "$3" "$4" | |
echo "POST OBSERVATION SCRIPT FINISHED" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Meteor Decoder Processor | |
# Mark Jessop <vk5qi@rfhead.net> 2017-09-01 | |
# | |
# This script processes soft-bit recordings from wherever satnogs_lrpt_demod puts them, | |
# then places them in the satnogs recorded data directory to be uploaded (eventually) | |
# | |
# It is suggested that this script is run with a post-observation script, | |
# with some kind of locking to avoid multiple instances running. i.e: | |
# flock -n /tmp/meteor_process.lock -c "python /path/to/process_meteor.py" | |
# | |
from glob import glob | |
import subprocess | |
import os | |
import shutil | |
import time | |
from time import sleep | |
# What wildcard string to use when searching for new soft-bit files. | |
SOURCE_PATH = "/tmp/data*.s" | |
# Where to place the complete images. | |
DESTINATION_DIR = "/tmp/.satnogs/data/" | |
# Where to put the soft-bit files. | |
RAW_DESTINATION_DIR = "/tmp/.satnogs/data/complete/" | |
# Locations for temporary files | |
TEMP_DIR = "/tmp/" | |
TEMP_FILENAME = "meteor_image_temp" | |
# Paths to binaries we need. If these binaries are not on $PATH, change the paths below to point to the appropriate place. | |
MEDET_PATH = "/home/jh4xsy/Downloads/medet/medet_arm" | |
CONVERT_PATH = "convert" | |
# medet arguments to produce a composite image, and also each individual channel. | |
MEDET_ARGS_COMPOSITE = ['-q', '-cd', '-r', '65', '-g', '65', '-b', '64'] | |
MEDET_ARGS_THERMAL = ['-q', '-d', '-r', '68', '-g', '68', '-b', '68'] | |
# Wait for a bit before processing, to avoid clashing with waterfall processing and running out of RAM. | |
WAIT_TIME = 10 | |
# Enable Thermal IR output. This requires a second pass over the output file | |
ENABLE_THERMAL = False | |
def cleanup_data(source_file = None): | |
""" | |
Cleanup any temporary files we have created, and optionally the source file. | |
""" | |
# Find temporary files. | |
_temp_files = glob(TEMP_DIR + TEMP_FILENAME + "*") | |
# Delete them. | |
for _file in _temp_files: | |
os.remove(_file) | |
# Delete the source soft-bit file if we have been passed it. | |
if source_file != None: | |
os.remove(source_file) | |
def convert_image(suffix = ""): | |
""" | |
Use the 'convert' utility (from imagemagick) to convert | |
a set of resultant METEOR images. | |
""" | |
raw_image_path = TEMP_DIR + TEMP_FILENAME + suffix + ".bmp" | |
result_image = TEMP_DIR + TEMP_FILENAME + suffix + ".png" | |
if os.path.isfile(raw_image_path): | |
sleep(1) | |
else: | |
return None | |
# get the current time. | |
hour = time.localtime(time.time()).tm_hour | |
# Call convert to convert the image | |
#subprocess.call([CONVERT_PATH, raw_image_path, result_image]) | |
if suffix == "_ir": | |
# suffix is ir, need to -negate to invert colors, and -linear-stretch to improve contrast | |
# Check time | |
if (hour>12): | |
# After 12 UT, so evening pass, need to rotate | |
subprocess.call([CONVERT_PATH, "-negate", "-rotate", "180", "-linear-stretch", "1x1%", raw_image_path, result_image]) | |
else: | |
# Before 12 UT, morning pass no need to rotate | |
subprocess.call([CONVERT_PATH, "-negate", "-linear-stretch", "1x1%", raw_image_path, result_image]) | |
else: | |
# suffix is vis. | |
if (hour>12): | |
# After 12 UT, so evening pass, need to rotate | |
subprocess.call([CONVERT_PATH, "-rotate", "180", raw_image_path, result_image]) | |
else: | |
# Before 12 UT, morning pass no need to rotate | |
subprocess.call([CONVERT_PATH, raw_image_path, result_image]) | |
# See if a resultant image was produced. | |
if os.path.isfile(result_image): | |
return result_image | |
else: | |
return None | |
def run_medet(source_file, command_args, suffix = ""): | |
""" | |
Attempt to run the medet meteor decoder over a file. | |
""" | |
_medet_command = [MEDET_PATH, source_file, TEMP_DIR + TEMP_FILENAME + suffix] | |
for _arg in command_args: | |
_medet_command.append(_arg) | |
ret_code = subprocess.call(_medet_command) | |
return ret_code | |
if __name__ == "__main__": | |
# Search for files. | |
_input_files = glob(SOURCE_PATH) | |
for _file in _input_files: | |
# Cleanup any temporary files. | |
cleanup_data() | |
# Sleep for a bit. | |
print("Waiting for %d seconds before processing." % WAIT_TIME) | |
sleep(WAIT_TIME) | |
# Process file | |
print("Attempting to process: %s" % _file) | |
run_medet(_file, MEDET_ARGS_COMPOSITE, "_vis") | |
result_vis = convert_image("_vis") | |
result_ir = None | |
if ENABLE_THERMAL: | |
run_medet(TEMP_DIR + TEMP_FILENAME + "_vis.dec", MEDET_ARGS_THERMAL, "_ir") | |
result_ir = convert_image("_ir") | |
_file_basename = os.path.basename(_file) | |
_file_noext = _file_basename.split(".")[0] | |
_dest_vis = DESTINATION_DIR + _file_noext + "_vis.png" | |
_dest_ir = DESTINATION_DIR + _file_noext + "_ir.png" | |
if result_vis != None: | |
print("VIS processing successful!") | |
shutil.move(result_vis, _dest_vis) | |
else: | |
print("VIS Processing unsuccessful.") | |
if result_ir != None: | |
print("IR processing successful!") | |
shutil.move(result_ir, _dest_ir) | |
else: | |
print("IR Processing unsuccessful.") | |
# Move file processed file into complete directory | |
shutil.move(_file, RAW_DESTINATION_DIR + os.path.basename(_file)) | |
cleanup_data() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# | |
# SPDX-License-Identifier: GPL-3.0 | |
# | |
# GNU Radio Python Flow Graph | |
# Title: LRPT Demodulator | |
# Author: Mark Jessop (vk5qi@rfhead.net) with amendments by Mat Burnham | |
# Description: A METEOR LRPT demodulation block | |
# GNU Radio version: 3.8.2.0 | |
from datetime import datetime | |
from gnuradio import analog | |
from gnuradio import blocks | |
from gnuradio import digital | |
from gnuradio import filter | |
from gnuradio.filter import firdes | |
from gnuradio import gr | |
import sys | |
import signal | |
from argparse import ArgumentParser | |
from gnuradio.eng_arg import eng_float, intx | |
from gnuradio import eng_notation | |
import os.path | |
import satnogs | |
import soapy | |
import distutils | |
from distutils import util | |
class satnogs_lrpt_demod(gr.top_block): | |
def __init__(self, antenna="", baudrate=72.0e3, bb_freq=0.0, bw=0.0, dc_removal="False", decoded_data_file_path="/tmp/.satnogs/data/data", dev_args="", doppler_correction_per_sec=1000, enable_iq_dump=0, file_path="test.wav", flip_images=0, gain=0.0, gain_mode="Overall", iq_file_path="/tmp/iq.dat", lo_offset=100e3, other_settings="", ppm=0, rigctl_port=4532, rx_freq=100e6, samp_rate_rx=0.0, soapy_rx_device="driver=invalid", stream_args="", sync=1, tune_args="", waterfall_file_path="/tmp/waterfall.dat"): | |
gr.top_block.__init__(self, "LRPT Demodulator") | |
################################################## | |
# Parameters | |
################################################## | |
self.antenna = antenna | |
self.baudrate = baudrate | |
self.bb_freq = bb_freq | |
self.bw = bw | |
self.dc_removal = dc_removal | |
self.decoded_data_file_path = decoded_data_file_path | |
self.dev_args = dev_args | |
self.doppler_correction_per_sec = doppler_correction_per_sec | |
self.enable_iq_dump = enable_iq_dump | |
self.file_path = file_path | |
self.flip_images = flip_images | |
self.gain = gain | |
self.gain_mode = gain_mode | |
self.iq_file_path = iq_file_path | |
self.lo_offset = lo_offset | |
self.other_settings = other_settings | |
self.ppm = ppm | |
self.rigctl_port = rigctl_port | |
self.rx_freq = rx_freq | |
self.samp_rate_rx = samp_rate_rx | |
self.soapy_rx_device = soapy_rx_device | |
self.stream_args = stream_args | |
self.sync = sync | |
self.tune_args = tune_args | |
self.waterfall_file_path = waterfall_file_path | |
################################################## | |
# Variables | |
################################################## | |
self.samp_rate = samp_rate = baudrate*2 | |
self.symb_rate = symb_rate = baudrate | |
self.quadrature_rate = quadrature_rate = samp_rate | |
self.sps = sps = (quadrature_rate*1.0)/(symb_rate*1.0) | |
self.pll_alpha = pll_alpha = 1e-3 | |
self.deviation = deviation = 17000 | |
self.clock_alpha = clock_alpha = 2e-3 | |
self.bitstream_name = bitstream_name = "/tmp/" + os.path.basename(decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s" | |
self.audio_samp_rate = audio_samp_rate = 48000 | |
################################################## | |
# Blocks | |
################################################## | |
self.soapy_source_0_0 = None | |
# Make sure that the gain mode is valid | |
if(gain_mode not in ['Overall', 'Specific', 'Settings Field']): | |
raise ValueError("Wrong gain mode on channel 0. Allowed gain modes: " | |
"['Overall', 'Specific', 'Settings Field']") | |
dev = soapy_rx_device | |
# Stream arguments for every activated stream | |
tune_args = [tune_args] | |
settings = [other_settings] | |
# Setup the device arguments | |
dev_args = dev_args | |
self.soapy_source_0_0 = soapy.source(1, dev, dev_args, stream_args, | |
tune_args, settings, samp_rate_rx, "fc32") | |
self.soapy_source_0_0.set_dc_removal(0,bool(distutils.util.strtobool(dc_removal))) | |
# Set up DC offset. If set to (0, 0) internally the source block | |
# will handle the case if no DC offset correction is supported | |
self.soapy_source_0_0.set_dc_offset(0,0) | |
# Setup IQ Balance. If set to (0, 0) internally the source block | |
# will handle the case if no IQ balance correction is supported | |
self.soapy_source_0_0.set_iq_balance(0,0) | |
self.soapy_source_0_0.set_agc(0,False) | |
# generic frequency setting should be specified first | |
self.soapy_source_0_0.set_frequency(0, rx_freq - lo_offset) | |
self.soapy_source_0_0.set_frequency(0,"BB",bb_freq) | |
# Setup Frequency correction. If set to 0 internally the source block | |
# will handle the case if no frequency correction is supported | |
self.soapy_source_0_0.set_frequency_correction(0,ppm) | |
self.soapy_source_0_0.set_antenna(0,antenna) | |
self.soapy_source_0_0.set_bandwidth(0,bw) | |
if(gain_mode != 'Settings Field'): | |
# pass is needed, in case the template does not evaluare anything | |
pass | |
self.soapy_source_0_0.set_gain(0,gain) | |
self.satnogs_waterfall_sink_0_0 = satnogs.waterfall_sink(samp_rate, rx_freq, 10, 1024, waterfall_file_path, 1) | |
self.satnogs_tcp_rigctl_msg_source_0 = satnogs.tcp_rigctl_msg_source("127.0.0.1", rigctl_port, False, int(1000.0/doppler_correction_per_sec) + 1, 1500) | |
self.satnogs_iq_sink_0_0 = satnogs.iq_sink(16768, iq_file_path, False, enable_iq_dump) | |
self.satnogs_doppler_compensation_0 = satnogs.doppler_compensation(samp_rate_rx, rx_freq, lo_offset, samp_rate, 1, 0) | |
self.root_raised_cosine_filter_0 = filter.fir_filter_ccf( | |
1, | |
firdes.root_raised_cosine( | |
1, | |
quadrature_rate, | |
symb_rate, | |
0.6, | |
361)) | |
self.digital_costas_loop_cc_0 = digital.costas_loop_cc(pll_alpha, 4, False) | |
self.digital_constellation_soft_decoder_cf_1 = digital.constellation_soft_decoder_cf(digital.constellation_calcdist(([-1-1j, -1+1j, 1+1j, 1-1j]), ([0, 1, 3, 2]), 4, 1).base()) | |
self.digital_clock_recovery_mm_xx_0 = digital.clock_recovery_mm_cc(sps, clock_alpha**2/4.0, 0.5, clock_alpha, 0.005) | |
self.blocks_short_to_char_0 = blocks.short_to_char(1) | |
self.blocks_float_to_short_0 = blocks.float_to_short(1, 32767) | |
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_char*1, bitstream_name, False) | |
self.blocks_file_sink_0.set_unbuffered(False) | |
self.analog_rail_ff_0 = analog.rail_ff(-1, 1) | |
self.analog_agc_xx_0 = analog.agc_cc(1000e-4, 0.5, 1.0) | |
self.analog_agc_xx_0.set_max_gain(4000) | |
################################################## | |
# Connections | |
################################################## | |
self.msg_connect((self.satnogs_tcp_rigctl_msg_source_0, 'freq'), (self.satnogs_doppler_compensation_0, 'doppler')) | |
self.connect((self.analog_agc_xx_0, 0), (self.root_raised_cosine_filter_0, 0)) | |
self.connect((self.analog_rail_ff_0, 0), (self.blocks_float_to_short_0, 0)) | |
self.connect((self.blocks_float_to_short_0, 0), (self.blocks_short_to_char_0, 0)) | |
self.connect((self.blocks_short_to_char_0, 0), (self.blocks_file_sink_0, 0)) | |
self.connect((self.digital_clock_recovery_mm_xx_0, 0), (self.digital_constellation_soft_decoder_cf_1, 0)) | |
self.connect((self.digital_constellation_soft_decoder_cf_1, 0), (self.analog_rail_ff_0, 0)) | |
self.connect((self.digital_costas_loop_cc_0, 0), (self.digital_clock_recovery_mm_xx_0, 0)) | |
self.connect((self.root_raised_cosine_filter_0, 0), (self.digital_costas_loop_cc_0, 0)) | |
self.connect((self.satnogs_doppler_compensation_0, 0), (self.analog_agc_xx_0, 0)) | |
self.connect((self.satnogs_doppler_compensation_0, 0), (self.satnogs_iq_sink_0_0, 0)) | |
self.connect((self.satnogs_doppler_compensation_0, 0), (self.satnogs_waterfall_sink_0_0, 0)) | |
self.connect((self.soapy_source_0_0, 0), (self.satnogs_doppler_compensation_0, 0)) | |
def get_antenna(self): | |
return self.antenna | |
def set_antenna(self, antenna): | |
self.antenna = antenna | |
self.soapy_source_0_0.set_antenna(0,self.antenna) | |
def get_baudrate(self): | |
return self.baudrate | |
def set_baudrate(self, baudrate): | |
self.baudrate = baudrate | |
self.set_samp_rate(self.baudrate*2) | |
self.set_symb_rate(self.baudrate) | |
def get_bb_freq(self): | |
return self.bb_freq | |
def set_bb_freq(self, bb_freq): | |
self.bb_freq = bb_freq | |
self.soapy_source_0_0.set_frequency(0,"BB",self.bb_freq) | |
def get_bw(self): | |
return self.bw | |
def set_bw(self, bw): | |
self.bw = bw | |
self.soapy_source_0_0.set_bandwidth(0,self.bw) | |
def get_dc_removal(self): | |
return self.dc_removal | |
def set_dc_removal(self, dc_removal): | |
self.dc_removal = dc_removal | |
self.soapy_source_0_0.set_dc_removal(0,bool(distutils.util.strtobool(self.dc_removal))) | |
def get_decoded_data_file_path(self): | |
return self.decoded_data_file_path | |
def set_decoded_data_file_path(self, decoded_data_file_path): | |
self.decoded_data_file_path = decoded_data_file_path | |
self.set_bitstream_name("/tmp/" + os.path.basename(self.decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s") | |
def get_dev_args(self): | |
return self.dev_args | |
def set_dev_args(self, dev_args): | |
self.dev_args = dev_args | |
def get_doppler_correction_per_sec(self): | |
return self.doppler_correction_per_sec | |
def set_doppler_correction_per_sec(self, doppler_correction_per_sec): | |
self.doppler_correction_per_sec = doppler_correction_per_sec | |
def get_enable_iq_dump(self): | |
return self.enable_iq_dump | |
def set_enable_iq_dump(self, enable_iq_dump): | |
self.enable_iq_dump = enable_iq_dump | |
def get_file_path(self): | |
return self.file_path | |
def set_file_path(self, file_path): | |
self.file_path = file_path | |
def get_flip_images(self): | |
return self.flip_images | |
def set_flip_images(self, flip_images): | |
self.flip_images = flip_images | |
def get_gain(self): | |
return self.gain | |
def set_gain(self, gain): | |
self.gain = gain | |
self.soapy_source_0_0.set_gain(0, self.gain) | |
def get_gain_mode(self): | |
return self.gain_mode | |
def set_gain_mode(self, gain_mode): | |
self.gain_mode = gain_mode | |
def get_iq_file_path(self): | |
return self.iq_file_path | |
def set_iq_file_path(self, iq_file_path): | |
self.iq_file_path = iq_file_path | |
def get_lo_offset(self): | |
return self.lo_offset | |
def set_lo_offset(self, lo_offset): | |
self.lo_offset = lo_offset | |
self.soapy_source_0_0.set_frequency(0, self.rx_freq - self.lo_offset) | |
def get_other_settings(self): | |
return self.other_settings | |
def set_other_settings(self, other_settings): | |
self.other_settings = other_settings | |
def get_ppm(self): | |
return self.ppm | |
def set_ppm(self, ppm): | |
self.ppm = ppm | |
self.soapy_source_0_0.set_frequency_correction(0,self.ppm) | |
def get_rigctl_port(self): | |
return self.rigctl_port | |
def set_rigctl_port(self, rigctl_port): | |
self.rigctl_port = rigctl_port | |
def get_rx_freq(self): | |
return self.rx_freq | |
def set_rx_freq(self, rx_freq): | |
self.rx_freq = rx_freq | |
self.soapy_source_0_0.set_frequency(0, self.rx_freq - self.lo_offset) | |
def get_samp_rate_rx(self): | |
return self.samp_rate_rx | |
def set_samp_rate_rx(self, samp_rate_rx): | |
self.samp_rate_rx = samp_rate_rx | |
def get_soapy_rx_device(self): | |
return self.soapy_rx_device | |
def set_soapy_rx_device(self, soapy_rx_device): | |
self.soapy_rx_device = soapy_rx_device | |
def get_stream_args(self): | |
return self.stream_args | |
def set_stream_args(self, stream_args): | |
self.stream_args = stream_args | |
def get_sync(self): | |
return self.sync | |
def set_sync(self, sync): | |
self.sync = sync | |
def get_tune_args(self): | |
return self.tune_args | |
def set_tune_args(self, tune_args): | |
self.tune_args = tune_args | |
def get_waterfall_file_path(self): | |
return self.waterfall_file_path | |
def set_waterfall_file_path(self, waterfall_file_path): | |
self.waterfall_file_path = waterfall_file_path | |
def get_samp_rate(self): | |
return self.samp_rate | |
def set_samp_rate(self, samp_rate): | |
self.samp_rate = samp_rate | |
self.set_quadrature_rate(self.samp_rate) | |
def get_symb_rate(self): | |
return self.symb_rate | |
def set_symb_rate(self, symb_rate): | |
self.symb_rate = symb_rate | |
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0)) | |
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361)) | |
def get_quadrature_rate(self): | |
return self.quadrature_rate | |
def set_quadrature_rate(self, quadrature_rate): | |
self.quadrature_rate = quadrature_rate | |
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0)) | |
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361)) | |
def get_sps(self): | |
return self.sps | |
def set_sps(self, sps): | |
self.sps = sps | |
self.digital_clock_recovery_mm_xx_0.set_omega(self.sps) | |
def get_pll_alpha(self): | |
return self.pll_alpha | |
def set_pll_alpha(self, pll_alpha): | |
self.pll_alpha = pll_alpha | |
self.digital_costas_loop_cc_0.set_loop_bandwidth(self.pll_alpha) | |
def get_deviation(self): | |
return self.deviation | |
def set_deviation(self, deviation): | |
self.deviation = deviation | |
def get_clock_alpha(self): | |
return self.clock_alpha | |
def set_clock_alpha(self, clock_alpha): | |
self.clock_alpha = clock_alpha | |
self.digital_clock_recovery_mm_xx_0.set_gain_omega(self.clock_alpha**2/4.0) | |
self.digital_clock_recovery_mm_xx_0.set_gain_mu(self.clock_alpha) | |
def get_bitstream_name(self): | |
return self.bitstream_name | |
def set_bitstream_name(self, bitstream_name): | |
self.bitstream_name = bitstream_name | |
self.blocks_file_sink_0.open(self.bitstream_name) | |
def get_audio_samp_rate(self): | |
return self.audio_samp_rate | |
def set_audio_samp_rate(self, audio_samp_rate): | |
self.audio_samp_rate = audio_samp_rate | |
def argument_parser(): | |
description = 'A METEOR LRPT demodulation block' | |
parser = ArgumentParser(description=description) | |
parser.add_argument( | |
"--antenna", dest="antenna", type=str, default="", | |
help="Set antenna [default=%(default)r]") | |
parser.add_argument( | |
"--baudrate", dest="baudrate", type=eng_float, default="72.0k", | |
help="Set baudrate [default=%(default)r]") | |
parser.add_argument( | |
"--bb-freq", dest="bb_freq", type=eng_float, default="0.0", | |
help="Set Baseband CORDIC frequency (if the device supports it) [default=%(default)r]") | |
parser.add_argument( | |
"--bw", dest="bw", type=eng_float, default="0.0", | |
help="Set Bandwidth [default=%(default)r]") | |
parser.add_argument( | |
"--dc-removal", dest="dc_removal", type=str, default="False", | |
help="Set Remove automatically the DC offset (if the device support it) [default=%(default)r]") | |
parser.add_argument( | |
"--decoded-data-file-path", dest="decoded_data_file_path", type=str, default="/tmp/.satnogs/data/data", | |
help="Set decoded_data_file_path [default=%(default)r]") | |
parser.add_argument( | |
"--dev-args", dest="dev_args", type=str, default="", | |
help="Set Device arguments [default=%(default)r]") | |
parser.add_argument( | |
"--doppler-correction-per-sec", dest="doppler_correction_per_sec", type=intx, default=1000, | |
help="Set doppler_correction_per_sec [default=%(default)r]") | |
parser.add_argument( | |
"--enable-iq-dump", dest="enable_iq_dump", type=intx, default=0, | |
help="Set enable_iq_dump [default=%(default)r]") | |
parser.add_argument( | |
"--file-path", dest="file_path", type=str, default="test.wav", | |
help="Set file_path [default=%(default)r]") | |
parser.add_argument( | |
"--flip-images", dest="flip_images", type=intx, default=0, | |
help="Set flip_images [default=%(default)r]") | |
parser.add_argument( | |
"--gain", dest="gain", type=eng_float, default="0.0", | |
help="Set gain [default=%(default)r]") | |
parser.add_argument( | |
"--gain-mode", dest="gain_mode", type=str, default="Overall", | |
help="Set gain_mode [default=%(default)r]") | |
parser.add_argument( | |
"--iq-file-path", dest="iq_file_path", type=str, default="/tmp/iq.dat", | |
help="Set iq_file_path [default=%(default)r]") | |
parser.add_argument( | |
"--lo-offset", dest="lo_offset", type=eng_float, default="100.0k", | |
help="Set lo_offset [default=%(default)r]") | |
parser.add_argument( | |
"--other-settings", dest="other_settings", type=str, default="", | |
help="Set Soapy Channel other settings [default=%(default)r]") | |
parser.add_argument( | |
"--ppm", dest="ppm", type=eng_float, default="0.0", | |
help="Set ppm [default=%(default)r]") | |
parser.add_argument( | |
"--rigctl-port", dest="rigctl_port", type=intx, default=4532, | |
help="Set rigctl_port [default=%(default)r]") | |
parser.add_argument( | |
"--rx-freq", dest="rx_freq", type=eng_float, default="100.0M", | |
help="Set rx_freq [default=%(default)r]") | |
parser.add_argument( | |
"--samp-rate-rx", dest="samp_rate_rx", type=eng_float, default="0.0", | |
help="Set Device Sampling rate [default=%(default)r]") | |
parser.add_argument( | |
"--soapy-rx-device", dest="soapy_rx_device", type=str, default="driver=invalid", | |
help="Set soapy_rx_device [default=%(default)r]") | |
parser.add_argument( | |
"--stream-args", dest="stream_args", type=str, default="", | |
help="Set Soapy Stream arguments [default=%(default)r]") | |
parser.add_argument( | |
"--sync", dest="sync", type=intx, default=1, | |
help="Set sync [default=%(default)r]") | |
parser.add_argument( | |
"--tune-args", dest="tune_args", type=str, default="", | |
help="Set Soapy Channel Tune arguments [default=%(default)r]") | |
parser.add_argument( | |
"--waterfall-file-path", dest="waterfall_file_path", type=str, default="/tmp/waterfall.dat", | |
help="Set waterfall_file_path [default=%(default)r]") | |
parser.add_argument( | |
"--udp-IP", dest="udp_IP", type=str, default="127.0.0.1", | |
help="Set udp_IP [default=%(default)r]") | |
parser.add_argument( | |
"--udp-dump-host", dest="udp_dump_host", type=str, default="", | |
help="Set udp_dump_host [default=%(default)r]") | |
parser.add_argument( | |
"--udp-dump-port", dest="udp_dump_port", type=intx, default=57356, | |
help="Set udp_dump_port [default=%(default)r]") | |
parser.add_argument( | |
"--udp-port", dest="udp_port", type=intx, default=16887, | |
help="Set udp_port [default=%(default)r]") | |
return parser | |
def main(top_block_cls=satnogs_lrpt_demod, options=None): | |
if options is None: | |
options = argument_parser().parse_args() | |
tb = top_block_cls(antenna=options.antenna, baudrate=options.baudrate, bb_freq=options.bb_freq, bw=options.bw, dc_removal=options.dc_removal, decoded_data_file_path=options.decoded_data_file_path, dev_args=options.dev_args, doppler_correction_per_sec=options.doppler_correction_per_sec, enable_iq_dump=options.enable_iq_dump, file_path=options.file_path, flip_images=options.flip_images, gain=options.gain, gain_mode=options.gain_mode, iq_file_path=options.iq_file_path, lo_offset=options.lo_offset, other_settings=options.other_settings, ppm=options.ppm, rigctl_port=options.rigctl_port, rx_freq=options.rx_freq, samp_rate_rx=options.samp_rate_rx, soapy_rx_device=options.soapy_rx_device, stream_args=options.stream_args, sync=options.sync, tune_args=options.tune_args, waterfall_file_path=options.waterfall_file_path) | |
def sig_handler(sig=None, frame=None): | |
tb.stop() | |
tb.wait() | |
sys.exit(0) | |
signal.signal(signal.SIGINT, sig_handler) | |
signal.signal(signal.SIGTERM, sig_handler) | |
tb.start() | |
tb.wait() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment