Skip to content

Instantly share code, notes, and snippets.

@jh4xsy
Last active September 17, 2022 06:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jh4xsy/c9edff891530ca59acefbe21bfad70a8 to your computer and use it in GitHub Desktop.
Save jh4xsy/c9edff891530ca59acefbe21bfad70a8 to your computer and use it in GitHub Desktop.
satnogsでlrptをデコードするスクリプト群
import json
import logging
import signal
import subprocess
import satnogsclient.settings as client_settings
LOGGER = logging.getLogger(__name__)
SATNOGS_FLOWGRAPH_SCRIPTS = {
'AFSK1K2': 'satnogs_afsk1200_ax25.py',
'AMSAT_DUV': 'satnogs_amsat_fox_duv_decoder.py',
'APT': 'satnogs_noaa_apt_decoder.py',
'ARGOS_BPSK_PMT_A3': 'satnogs_argos_bpsk_ldr.py',
'BPSK': 'satnogs_bpsk.py',
'CW': 'satnogs_cw_decoder.py',
'FM': 'satnogs_fm.py',
'FSK': 'satnogs_fsk.py',
'GFSK_RKTR': 'satnogs_reaktor_hello_world_fsk9600_decoder.py',
'GFSK/BPSK': 'satnogs_qubik_telem.py',
'SSTV': 'satnogs_sstv_pd120_demod.py',
'LRPT': 'satnogs_lrpt_demod.py'
}
SATNOGS_FLOWGRAPH_MODES = {
'AFSK': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['AFSK1K2'],
'has_baudrate': False,
'has_framing': False
},
'APT': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['APT'],
'has_baudrate': False,
'has_framing': False,
},
'BPSK': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['BPSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax25'
},
'BPSK PMT-A3': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['ARGOS_BPSK_PMT_A3'],
'has_baudrate': True,
'has_framing': False
},
'CW': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['CW'],
'has_baudrate': True,
'has_framing': False
},
'DUV': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['AMSAT_DUV'],
'has_baudrate': False,
'has_framing': False
},
'FM': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FM'],
'has_baudrate': False,
'has_framing': False
},
'FSK': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax25'
},
'FSK AX.100 Mode 5': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax100_mode5'
},
'FSK AX.100 Mode 6': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax100_mode6'
},
'GFSK': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax25'
},
'GFSK Rktr': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['GFSK_RKTR'],
'has_baudrate': True,
'has_framing': False
},
'GFSK/BPSK': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['GFSK/BPSK'],
'has_baudrate': True,
'has_framing': False,
},
'GMSK': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax25'
},
'MSK': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax25'
},
'MSK AX.100 Mode 5': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax100_mode5'
},
'MSK AX.100 Mode 6': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['FSK'],
'has_baudrate': True,
'has_framing': True,
'framing': 'ax100_mode6'
},
'SSTV': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['SSTV'],
'has_baudrate': False,
'has_framing': False
},
'LRPT': {
'script_name': SATNOGS_FLOWGRAPH_SCRIPTS['LRPT'],
'has_baudrate': False,
'has_framing': False
}
}
SATNOGS_FLOWGRAPH_MODE_DEFAULT = 'FM'
class Flowgraph():
"""
Execute SatNOGS Flowgraphs
:param device: SoapySDR device
:type device: str
:param sampling_rate: Sampling rate
:type sampling_rate: int
:param frequency: RX frequency
:type frequency: int
:param mode: Mode of operation
:type mode: str
:param baud: Baud rate or WPM
:type baud: int
:param output_data: Dictionary of output data
:type output_data: dict
"""
def __init__(self, device, sampling_rate, frequency, mode, baud, output_data):
"""
Class constructor
"""
self.parameters = {
'soapy-rx-device': device,
'samp-rate-rx': str(sampling_rate),
'rx-freq': str(frequency),
'file-path': output_data['audio'],
'waterfall-file-path': output_data['waterfall'],
'decoded-data-file-path': output_data['decoded'],
'doppler-correction-per-sec': client_settings.SATNOGS_DOPPLER_CORR_PER_SEC,
'lo-offset': client_settings.SATNOGS_LO_OFFSET,
'ppm': client_settings.SATNOGS_PPM_ERROR,
'rigctl-port': str(client_settings.SATNOGS_RIG_PORT),
'gain-mode': client_settings.SATNOGS_GAIN_MODE,
'gain': client_settings.SATNOGS_RF_GAIN,
'antenna': client_settings.SATNOGS_ANTENNA,
'dev-args': client_settings.SATNOGS_DEV_ARGS,
'stream-args': client_settings.SATNOGS_STREAM_ARGS,
'tune-args': client_settings.SATNOGS_TUNE_ARGS,
'other-settings': client_settings.SATNOGS_OTHER_SETTINGS,
'dc-removal': client_settings.SATNOGS_DC_REMOVAL,
'bb-freq': client_settings.SATNOGS_BB_FREQ,
'bw': client_settings.SATNOGS_RX_BANDWIDTH,
'enable-iq-dump': str(int(client_settings.ENABLE_IQ_DUMP is True)),
'iq-file-path': client_settings.IQ_DUMP_FILENAME,
'udp-dump-host': client_settings.UDP_DUMP_HOST,
'udp-dump-port': client_settings.UDP_DUMP_PORT,
'wpm': None,
'baudrate': None,
'framing': None
}
if mode in SATNOGS_FLOWGRAPH_MODES:
self.script = SATNOGS_FLOWGRAPH_MODES[mode]['script_name']
if baud and SATNOGS_FLOWGRAPH_MODES[mode]['has_baudrate']:
# If this is a CW observation pass the WPM parameter
if mode == 'CW':
self.parameters['wpm'] = str(int(baud))
else:
self.parameters['baudrate'] = str(int(baud))
# Apply framing mode
if SATNOGS_FLOWGRAPH_MODES[mode]['has_framing']:
self.parameters['framing'] = SATNOGS_FLOWGRAPH_MODES[mode]['framing']
else:
self.script = SATNOGS_FLOWGRAPH_MODES[SATNOGS_FLOWGRAPH_MODE_DEFAULT]['script_name']
self.process = None
@property
def enabled(self):
"""
Get flowgraph running status
:return: Flowgraph running status
:rtype: bool
"""
if self.process and self.process.poll() is None:
return True
return False
@enabled.setter
def enabled(self, status):
"""
Start or stop running of flowgraph
:param status: Running status
:type status: bool
"""
if status:
args = [self.script]
for parameter, value in self.parameters.items():
if value is not None:
args.append('--{}={}'.format(parameter, value))
try:
self.process = subprocess.Popen(args)
except OSError:
LOGGER.exception('Could not start GNURadio python script')
else:
if self.process:
self.process.send_signal(signal.SIGINT)
_, _ = self.process.communicate()
@property
def info(self):
"""
Get information and parameters of flowgraph and radio
:return: Information about flowgraph and radio
:rtype: dict
"""
client_metadata = {
'radio': {
'name': 'gr-satnogs',
'version': None,
'parameters': self.parameters
}
}
process = subprocess.Popen(['python3', '-m', 'satnogs.satnogs_info'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
gr_satnogs_info, _ = process.communicate() # pylint: disable=W0612
if process.returncode == 0:
try:
gr_satnogs_info = json.loads(gr_satnogs_info)
except ValueError:
client_metadata['radio']['version'] = 'invalid'
else:
if 'version' in gr_satnogs_info:
client_metadata['radio']['version'] = gr_satnogs_info['version']
else:
client_metadata['radio']['version'] = 'unknown'
return client_metadata
#!/bin/bash
echo "POST OBSERVATION SCRIPT RUNNING"
#Maybe you need to do something with the Bias T:
#/home/pi/rtl_biast/build/src/rtl_biast -b 0
python /home/jh4xsy/Downloads/satnogs-extras/scripts/process_meteor.py "$1" "$2" "$3" "$4"
echo "POST OBSERVATION SCRIPT FINISHED"
#!/usr/bin/env python
#
# Meteor Decoder Processor
# Mark Jessop <vk5qi@rfhead.net> 2017-09-01
#
# This script processes soft-bit recordings from wherever satnogs_lrpt_demod puts them,
# then places them in the satnogs recorded data directory to be uploaded (eventually)
#
# It is suggested that this script is run with a post-observation script,
# with some kind of locking to avoid multiple instances running. i.e:
# flock -n /tmp/meteor_process.lock -c "python /path/to/process_meteor.py"
#
from glob import glob
import subprocess
import os
import shutil
import time
from time import sleep
# What wildcard string to use when searching for new soft-bit files.
SOURCE_PATH = "/tmp/data*.s"
# Where to place the complete images.
DESTINATION_DIR = "/tmp/.satnogs/data/"
# Where to put the soft-bit files.
RAW_DESTINATION_DIR = "/tmp/.satnogs/data/complete/"
# Locations for temporary files
TEMP_DIR = "/tmp/"
TEMP_FILENAME = "meteor_image_temp"
# Paths to binaries we need. If these binaries are not on $PATH, change the paths below to point to the appropriate place.
MEDET_PATH = "/home/jh4xsy/Downloads/medet/medet_arm"
CONVERT_PATH = "convert"
# medet arguments to produce a composite image, and also each individual channel.
MEDET_ARGS_COMPOSITE = ['-q', '-cd', '-r', '65', '-g', '65', '-b', '64']
MEDET_ARGS_THERMAL = ['-q', '-d', '-r', '68', '-g', '68', '-b', '68']
# Wait for a bit before processing, to avoid clashing with waterfall processing and running out of RAM.
WAIT_TIME = 10
# Enable Thermal IR output. This requires a second pass over the output file
ENABLE_THERMAL = False
def cleanup_data(source_file = None):
"""
Cleanup any temporary files we have created, and optionally the source file.
"""
# Find temporary files.
_temp_files = glob(TEMP_DIR + TEMP_FILENAME + "*")
# Delete them.
for _file in _temp_files:
os.remove(_file)
# Delete the source soft-bit file if we have been passed it.
if source_file != None:
os.remove(source_file)
def convert_image(suffix = ""):
"""
Use the 'convert' utility (from imagemagick) to convert
a set of resultant METEOR images.
"""
raw_image_path = TEMP_DIR + TEMP_FILENAME + suffix + ".bmp"
result_image = TEMP_DIR + TEMP_FILENAME + suffix + ".png"
if os.path.isfile(raw_image_path):
sleep(1)
else:
return None
# get the current time.
hour = time.localtime(time.time()).tm_hour
# Call convert to convert the image
#subprocess.call([CONVERT_PATH, raw_image_path, result_image])
if suffix == "_ir":
# suffix is ir, need to -negate to invert colors, and -linear-stretch to improve contrast
# Check time
if (hour>12):
# After 12 UT, so evening pass, need to rotate
subprocess.call([CONVERT_PATH, "-negate", "-rotate", "180", "-linear-stretch", "1x1%", raw_image_path, result_image])
else:
# Before 12 UT, morning pass no need to rotate
subprocess.call([CONVERT_PATH, "-negate", "-linear-stretch", "1x1%", raw_image_path, result_image])
else:
# suffix is vis.
if (hour>12):
# After 12 UT, so evening pass, need to rotate
subprocess.call([CONVERT_PATH, "-rotate", "180", raw_image_path, result_image])
else:
# Before 12 UT, morning pass no need to rotate
subprocess.call([CONVERT_PATH, raw_image_path, result_image])
# See if a resultant image was produced.
if os.path.isfile(result_image):
return result_image
else:
return None
def run_medet(source_file, command_args, suffix = ""):
"""
Attempt to run the medet meteor decoder over a file.
"""
_medet_command = [MEDET_PATH, source_file, TEMP_DIR + TEMP_FILENAME + suffix]
for _arg in command_args:
_medet_command.append(_arg)
ret_code = subprocess.call(_medet_command)
return ret_code
if __name__ == "__main__":
# Search for files.
_input_files = glob(SOURCE_PATH)
for _file in _input_files:
# Cleanup any temporary files.
cleanup_data()
# Sleep for a bit.
print("Waiting for %d seconds before processing." % WAIT_TIME)
sleep(WAIT_TIME)
# Process file
print("Attempting to process: %s" % _file)
run_medet(_file, MEDET_ARGS_COMPOSITE, "_vis")
result_vis = convert_image("_vis")
result_ir = None
if ENABLE_THERMAL:
run_medet(TEMP_DIR + TEMP_FILENAME + "_vis.dec", MEDET_ARGS_THERMAL, "_ir")
result_ir = convert_image("_ir")
_file_basename = os.path.basename(_file)
_file_noext = _file_basename.split(".")[0]
_dest_vis = DESTINATION_DIR + _file_noext + "_vis.png"
_dest_ir = DESTINATION_DIR + _file_noext + "_ir.png"
if result_vis != None:
print("VIS processing successful!")
shutil.move(result_vis, _dest_vis)
else:
print("VIS Processing unsuccessful.")
if result_ir != None:
print("IR processing successful!")
shutil.move(result_ir, _dest_ir)
else:
print("IR Processing unsuccessful.")
# Move file processed file into complete directory
shutil.move(_file, RAW_DESTINATION_DIR + os.path.basename(_file))
cleanup_data()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: LRPT Demodulator
# Author: Mark Jessop (vk5qi@rfhead.net) with amendments by Mat Burnham
# Description: A METEOR LRPT demodulation block
# GNU Radio version: 3.8.2.0
from datetime import datetime
from gnuradio import analog
from gnuradio import blocks
from gnuradio import digital
from gnuradio import filter
from gnuradio.filter import firdes
from gnuradio import gr
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
import os.path
import satnogs
import soapy
import distutils
from distutils import util
class satnogs_lrpt_demod(gr.top_block):
def __init__(self, antenna="", baudrate=72.0e3, bb_freq=0.0, bw=0.0, dc_removal="False", decoded_data_file_path="/tmp/.satnogs/data/data", dev_args="", doppler_correction_per_sec=1000, enable_iq_dump=0, file_path="test.wav", flip_images=0, gain=0.0, gain_mode="Overall", iq_file_path="/tmp/iq.dat", lo_offset=100e3, other_settings="", ppm=0, rigctl_port=4532, rx_freq=100e6, samp_rate_rx=0.0, soapy_rx_device="driver=invalid", stream_args="", sync=1, tune_args="", waterfall_file_path="/tmp/waterfall.dat"):
gr.top_block.__init__(self, "LRPT Demodulator")
##################################################
# Parameters
##################################################
self.antenna = antenna
self.baudrate = baudrate
self.bb_freq = bb_freq
self.bw = bw
self.dc_removal = dc_removal
self.decoded_data_file_path = decoded_data_file_path
self.dev_args = dev_args
self.doppler_correction_per_sec = doppler_correction_per_sec
self.enable_iq_dump = enable_iq_dump
self.file_path = file_path
self.flip_images = flip_images
self.gain = gain
self.gain_mode = gain_mode
self.iq_file_path = iq_file_path
self.lo_offset = lo_offset
self.other_settings = other_settings
self.ppm = ppm
self.rigctl_port = rigctl_port
self.rx_freq = rx_freq
self.samp_rate_rx = samp_rate_rx
self.soapy_rx_device = soapy_rx_device
self.stream_args = stream_args
self.sync = sync
self.tune_args = tune_args
self.waterfall_file_path = waterfall_file_path
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = baudrate*2
self.symb_rate = symb_rate = baudrate
self.quadrature_rate = quadrature_rate = samp_rate
self.sps = sps = (quadrature_rate*1.0)/(symb_rate*1.0)
self.pll_alpha = pll_alpha = 1e-3
self.deviation = deviation = 17000
self.clock_alpha = clock_alpha = 2e-3
self.bitstream_name = bitstream_name = "/tmp/" + os.path.basename(decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s"
self.audio_samp_rate = audio_samp_rate = 48000
##################################################
# Blocks
##################################################
self.soapy_source_0_0 = None
# Make sure that the gain mode is valid
if(gain_mode not in ['Overall', 'Specific', 'Settings Field']):
raise ValueError("Wrong gain mode on channel 0. Allowed gain modes: "
"['Overall', 'Specific', 'Settings Field']")
dev = soapy_rx_device
# Stream arguments for every activated stream
tune_args = [tune_args]
settings = [other_settings]
# Setup the device arguments
dev_args = dev_args
self.soapy_source_0_0 = soapy.source(1, dev, dev_args, stream_args,
tune_args, settings, samp_rate_rx, "fc32")
self.soapy_source_0_0.set_dc_removal(0,bool(distutils.util.strtobool(dc_removal)))
# Set up DC offset. If set to (0, 0) internally the source block
# will handle the case if no DC offset correction is supported
self.soapy_source_0_0.set_dc_offset(0,0)
# Setup IQ Balance. If set to (0, 0) internally the source block
# will handle the case if no IQ balance correction is supported
self.soapy_source_0_0.set_iq_balance(0,0)
self.soapy_source_0_0.set_agc(0,False)
# generic frequency setting should be specified first
self.soapy_source_0_0.set_frequency(0, rx_freq - lo_offset)
self.soapy_source_0_0.set_frequency(0,"BB",bb_freq)
# Setup Frequency correction. If set to 0 internally the source block
# will handle the case if no frequency correction is supported
self.soapy_source_0_0.set_frequency_correction(0,ppm)
self.soapy_source_0_0.set_antenna(0,antenna)
self.soapy_source_0_0.set_bandwidth(0,bw)
if(gain_mode != 'Settings Field'):
# pass is needed, in case the template does not evaluare anything
pass
self.soapy_source_0_0.set_gain(0,gain)
self.satnogs_waterfall_sink_0_0 = satnogs.waterfall_sink(samp_rate, rx_freq, 10, 1024, waterfall_file_path, 1)
self.satnogs_tcp_rigctl_msg_source_0 = satnogs.tcp_rigctl_msg_source("127.0.0.1", rigctl_port, False, int(1000.0/doppler_correction_per_sec) + 1, 1500)
self.satnogs_iq_sink_0_0 = satnogs.iq_sink(16768, iq_file_path, False, enable_iq_dump)
self.satnogs_doppler_compensation_0 = satnogs.doppler_compensation(samp_rate_rx, rx_freq, lo_offset, samp_rate, 1, 0)
self.root_raised_cosine_filter_0 = filter.fir_filter_ccf(
1,
firdes.root_raised_cosine(
1,
quadrature_rate,
symb_rate,
0.6,
361))
self.digital_costas_loop_cc_0 = digital.costas_loop_cc(pll_alpha, 4, False)
self.digital_constellation_soft_decoder_cf_1 = digital.constellation_soft_decoder_cf(digital.constellation_calcdist(([-1-1j, -1+1j, 1+1j, 1-1j]), ([0, 1, 3, 2]), 4, 1).base())
self.digital_clock_recovery_mm_xx_0 = digital.clock_recovery_mm_cc(sps, clock_alpha**2/4.0, 0.5, clock_alpha, 0.005)
self.blocks_short_to_char_0 = blocks.short_to_char(1)
self.blocks_float_to_short_0 = blocks.float_to_short(1, 32767)
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_char*1, bitstream_name, False)
self.blocks_file_sink_0.set_unbuffered(False)
self.analog_rail_ff_0 = analog.rail_ff(-1, 1)
self.analog_agc_xx_0 = analog.agc_cc(1000e-4, 0.5, 1.0)
self.analog_agc_xx_0.set_max_gain(4000)
##################################################
# Connections
##################################################
self.msg_connect((self.satnogs_tcp_rigctl_msg_source_0, 'freq'), (self.satnogs_doppler_compensation_0, 'doppler'))
self.connect((self.analog_agc_xx_0, 0), (self.root_raised_cosine_filter_0, 0))
self.connect((self.analog_rail_ff_0, 0), (self.blocks_float_to_short_0, 0))
self.connect((self.blocks_float_to_short_0, 0), (self.blocks_short_to_char_0, 0))
self.connect((self.blocks_short_to_char_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.digital_clock_recovery_mm_xx_0, 0), (self.digital_constellation_soft_decoder_cf_1, 0))
self.connect((self.digital_constellation_soft_decoder_cf_1, 0), (self.analog_rail_ff_0, 0))
self.connect((self.digital_costas_loop_cc_0, 0), (self.digital_clock_recovery_mm_xx_0, 0))
self.connect((self.root_raised_cosine_filter_0, 0), (self.digital_costas_loop_cc_0, 0))
self.connect((self.satnogs_doppler_compensation_0, 0), (self.analog_agc_xx_0, 0))
self.connect((self.satnogs_doppler_compensation_0, 0), (self.satnogs_iq_sink_0_0, 0))
self.connect((self.satnogs_doppler_compensation_0, 0), (self.satnogs_waterfall_sink_0_0, 0))
self.connect((self.soapy_source_0_0, 0), (self.satnogs_doppler_compensation_0, 0))
def get_antenna(self):
return self.antenna
def set_antenna(self, antenna):
self.antenna = antenna
self.soapy_source_0_0.set_antenna(0,self.antenna)
def get_baudrate(self):
return self.baudrate
def set_baudrate(self, baudrate):
self.baudrate = baudrate
self.set_samp_rate(self.baudrate*2)
self.set_symb_rate(self.baudrate)
def get_bb_freq(self):
return self.bb_freq
def set_bb_freq(self, bb_freq):
self.bb_freq = bb_freq
self.soapy_source_0_0.set_frequency(0,"BB",self.bb_freq)
def get_bw(self):
return self.bw
def set_bw(self, bw):
self.bw = bw
self.soapy_source_0_0.set_bandwidth(0,self.bw)
def get_dc_removal(self):
return self.dc_removal
def set_dc_removal(self, dc_removal):
self.dc_removal = dc_removal
self.soapy_source_0_0.set_dc_removal(0,bool(distutils.util.strtobool(self.dc_removal)))
def get_decoded_data_file_path(self):
return self.decoded_data_file_path
def set_decoded_data_file_path(self, decoded_data_file_path):
self.decoded_data_file_path = decoded_data_file_path
self.set_bitstream_name("/tmp/" + os.path.basename(self.decoded_data_file_path) + "_" + datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S") + ".s")
def get_dev_args(self):
return self.dev_args
def set_dev_args(self, dev_args):
self.dev_args = dev_args
def get_doppler_correction_per_sec(self):
return self.doppler_correction_per_sec
def set_doppler_correction_per_sec(self, doppler_correction_per_sec):
self.doppler_correction_per_sec = doppler_correction_per_sec
def get_enable_iq_dump(self):
return self.enable_iq_dump
def set_enable_iq_dump(self, enable_iq_dump):
self.enable_iq_dump = enable_iq_dump
def get_file_path(self):
return self.file_path
def set_file_path(self, file_path):
self.file_path = file_path
def get_flip_images(self):
return self.flip_images
def set_flip_images(self, flip_images):
self.flip_images = flip_images
def get_gain(self):
return self.gain
def set_gain(self, gain):
self.gain = gain
self.soapy_source_0_0.set_gain(0, self.gain)
def get_gain_mode(self):
return self.gain_mode
def set_gain_mode(self, gain_mode):
self.gain_mode = gain_mode
def get_iq_file_path(self):
return self.iq_file_path
def set_iq_file_path(self, iq_file_path):
self.iq_file_path = iq_file_path
def get_lo_offset(self):
return self.lo_offset
def set_lo_offset(self, lo_offset):
self.lo_offset = lo_offset
self.soapy_source_0_0.set_frequency(0, self.rx_freq - self.lo_offset)
def get_other_settings(self):
return self.other_settings
def set_other_settings(self, other_settings):
self.other_settings = other_settings
def get_ppm(self):
return self.ppm
def set_ppm(self, ppm):
self.ppm = ppm
self.soapy_source_0_0.set_frequency_correction(0,self.ppm)
def get_rigctl_port(self):
return self.rigctl_port
def set_rigctl_port(self, rigctl_port):
self.rigctl_port = rigctl_port
def get_rx_freq(self):
return self.rx_freq
def set_rx_freq(self, rx_freq):
self.rx_freq = rx_freq
self.soapy_source_0_0.set_frequency(0, self.rx_freq - self.lo_offset)
def get_samp_rate_rx(self):
return self.samp_rate_rx
def set_samp_rate_rx(self, samp_rate_rx):
self.samp_rate_rx = samp_rate_rx
def get_soapy_rx_device(self):
return self.soapy_rx_device
def set_soapy_rx_device(self, soapy_rx_device):
self.soapy_rx_device = soapy_rx_device
def get_stream_args(self):
return self.stream_args
def set_stream_args(self, stream_args):
self.stream_args = stream_args
def get_sync(self):
return self.sync
def set_sync(self, sync):
self.sync = sync
def get_tune_args(self):
return self.tune_args
def set_tune_args(self, tune_args):
self.tune_args = tune_args
def get_waterfall_file_path(self):
return self.waterfall_file_path
def set_waterfall_file_path(self, waterfall_file_path):
self.waterfall_file_path = waterfall_file_path
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.set_quadrature_rate(self.samp_rate)
def get_symb_rate(self):
return self.symb_rate
def set_symb_rate(self, symb_rate):
self.symb_rate = symb_rate
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0))
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361))
def get_quadrature_rate(self):
return self.quadrature_rate
def set_quadrature_rate(self, quadrature_rate):
self.quadrature_rate = quadrature_rate
self.set_sps((self.quadrature_rate*1.0)/(self.symb_rate*1.0))
self.root_raised_cosine_filter_0.set_taps(firdes.root_raised_cosine(1, self.quadrature_rate, self.symb_rate, 0.6, 361))
def get_sps(self):
return self.sps
def set_sps(self, sps):
self.sps = sps
self.digital_clock_recovery_mm_xx_0.set_omega(self.sps)
def get_pll_alpha(self):
return self.pll_alpha
def set_pll_alpha(self, pll_alpha):
self.pll_alpha = pll_alpha
self.digital_costas_loop_cc_0.set_loop_bandwidth(self.pll_alpha)
def get_deviation(self):
return self.deviation
def set_deviation(self, deviation):
self.deviation = deviation
def get_clock_alpha(self):
return self.clock_alpha
def set_clock_alpha(self, clock_alpha):
self.clock_alpha = clock_alpha
self.digital_clock_recovery_mm_xx_0.set_gain_omega(self.clock_alpha**2/4.0)
self.digital_clock_recovery_mm_xx_0.set_gain_mu(self.clock_alpha)
def get_bitstream_name(self):
return self.bitstream_name
def set_bitstream_name(self, bitstream_name):
self.bitstream_name = bitstream_name
self.blocks_file_sink_0.open(self.bitstream_name)
def get_audio_samp_rate(self):
return self.audio_samp_rate
def set_audio_samp_rate(self, audio_samp_rate):
self.audio_samp_rate = audio_samp_rate
def argument_parser():
description = 'A METEOR LRPT demodulation block'
parser = ArgumentParser(description=description)
parser.add_argument(
"--antenna", dest="antenna", type=str, default="",
help="Set antenna [default=%(default)r]")
parser.add_argument(
"--baudrate", dest="baudrate", type=eng_float, default="72.0k",
help="Set baudrate [default=%(default)r]")
parser.add_argument(
"--bb-freq", dest="bb_freq", type=eng_float, default="0.0",
help="Set Baseband CORDIC frequency (if the device supports it) [default=%(default)r]")
parser.add_argument(
"--bw", dest="bw", type=eng_float, default="0.0",
help="Set Bandwidth [default=%(default)r]")
parser.add_argument(
"--dc-removal", dest="dc_removal", type=str, default="False",
help="Set Remove automatically the DC offset (if the device support it) [default=%(default)r]")
parser.add_argument(
"--decoded-data-file-path", dest="decoded_data_file_path", type=str, default="/tmp/.satnogs/data/data",
help="Set decoded_data_file_path [default=%(default)r]")
parser.add_argument(
"--dev-args", dest="dev_args", type=str, default="",
help="Set Device arguments [default=%(default)r]")
parser.add_argument(
"--doppler-correction-per-sec", dest="doppler_correction_per_sec", type=intx, default=1000,
help="Set doppler_correction_per_sec [default=%(default)r]")
parser.add_argument(
"--enable-iq-dump", dest="enable_iq_dump", type=intx, default=0,
help="Set enable_iq_dump [default=%(default)r]")
parser.add_argument(
"--file-path", dest="file_path", type=str, default="test.wav",
help="Set file_path [default=%(default)r]")
parser.add_argument(
"--flip-images", dest="flip_images", type=intx, default=0,
help="Set flip_images [default=%(default)r]")
parser.add_argument(
"--gain", dest="gain", type=eng_float, default="0.0",
help="Set gain [default=%(default)r]")
parser.add_argument(
"--gain-mode", dest="gain_mode", type=str, default="Overall",
help="Set gain_mode [default=%(default)r]")
parser.add_argument(
"--iq-file-path", dest="iq_file_path", type=str, default="/tmp/iq.dat",
help="Set iq_file_path [default=%(default)r]")
parser.add_argument(
"--lo-offset", dest="lo_offset", type=eng_float, default="100.0k",
help="Set lo_offset [default=%(default)r]")
parser.add_argument(
"--other-settings", dest="other_settings", type=str, default="",
help="Set Soapy Channel other settings [default=%(default)r]")
parser.add_argument(
"--ppm", dest="ppm", type=eng_float, default="0.0",
help="Set ppm [default=%(default)r]")
parser.add_argument(
"--rigctl-port", dest="rigctl_port", type=intx, default=4532,
help="Set rigctl_port [default=%(default)r]")
parser.add_argument(
"--rx-freq", dest="rx_freq", type=eng_float, default="100.0M",
help="Set rx_freq [default=%(default)r]")
parser.add_argument(
"--samp-rate-rx", dest="samp_rate_rx", type=eng_float, default="0.0",
help="Set Device Sampling rate [default=%(default)r]")
parser.add_argument(
"--soapy-rx-device", dest="soapy_rx_device", type=str, default="driver=invalid",
help="Set soapy_rx_device [default=%(default)r]")
parser.add_argument(
"--stream-args", dest="stream_args", type=str, default="",
help="Set Soapy Stream arguments [default=%(default)r]")
parser.add_argument(
"--sync", dest="sync", type=intx, default=1,
help="Set sync [default=%(default)r]")
parser.add_argument(
"--tune-args", dest="tune_args", type=str, default="",
help="Set Soapy Channel Tune arguments [default=%(default)r]")
parser.add_argument(
"--waterfall-file-path", dest="waterfall_file_path", type=str, default="/tmp/waterfall.dat",
help="Set waterfall_file_path [default=%(default)r]")
parser.add_argument(
"--udp-IP", dest="udp_IP", type=str, default="127.0.0.1",
help="Set udp_IP [default=%(default)r]")
parser.add_argument(
"--udp-dump-host", dest="udp_dump_host", type=str, default="",
help="Set udp_dump_host [default=%(default)r]")
parser.add_argument(
"--udp-dump-port", dest="udp_dump_port", type=intx, default=57356,
help="Set udp_dump_port [default=%(default)r]")
parser.add_argument(
"--udp-port", dest="udp_port", type=intx, default=16887,
help="Set udp_port [default=%(default)r]")
return parser
def main(top_block_cls=satnogs_lrpt_demod, options=None):
if options is None:
options = argument_parser().parse_args()
tb = top_block_cls(antenna=options.antenna, baudrate=options.baudrate, bb_freq=options.bb_freq, bw=options.bw, dc_removal=options.dc_removal, decoded_data_file_path=options.decoded_data_file_path, dev_args=options.dev_args, doppler_correction_per_sec=options.doppler_correction_per_sec, enable_iq_dump=options.enable_iq_dump, file_path=options.file_path, flip_images=options.flip_images, gain=options.gain, gain_mode=options.gain_mode, iq_file_path=options.iq_file_path, lo_offset=options.lo_offset, other_settings=options.other_settings, ppm=options.ppm, rigctl_port=options.rigctl_port, rx_freq=options.rx_freq, samp_rate_rx=options.samp_rate_rx, soapy_rx_device=options.soapy_rx_device, stream_args=options.stream_args, sync=options.sync, tune_args=options.tune_args, waterfall_file_path=options.waterfall_file_path)
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.wait()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment