Skip to content

Instantly share code, notes, and snippets.

@Mejiro-McQueen
Last active June 13, 2024 18:00
Show Gist options
  • Save Mejiro-McQueen/0ea004a3608dc5e6ac668de3d688530f to your computer and use it in GitHub Desktop.
Save Mejiro-McQueen/0ea004a3608dc5e6ac668de3d688530f to your computer and use it in GitHub Desktop.
Influx2 Plugin Example
import ait.core
from ait.core.server.plugins import Plugin
from ait.core.message_types import MessageType
from ait.core import log
import socket
from bitstring import BitArray
from enum import Enum
import json
import xmltodict
from functools import reduce
import ast
import traceback as tb
from colorama import Fore
def deep_get(dictionary, keys, default=None):
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
with open('.//SunRISE-AIT/sunrise/monitor_channel.xml', 'rb') as f:
q = xmltodict.parse(f)
enum_table = {}
for i in deep_get(q, 'telemetry_dictionary.enum_definitions.enum_table'):
d = {}
for j in deep_get(i, 'values.enum'):
if isinstance(j, str):
j = {'@symbol': 'None', '@numeric': '0'}
d[j['@symbol']] = int(j['@numeric'])
e = Enum(i['@name'], d)
enum_table[e.__name__] = e
class Monitor_Channel:
def __init__(self, abbreviation, name, data_type, source,
byte_length, measurement_id,
title, description, categories):
self.value = None
self.abbreviation = abbreviation
self.name = name
self.data_type = data_type
self.source = source
self.byte_length = byte_length
self.measurement_id = measurement_id
self.title = title
self.description = description
self.categories = categories
#log.error(f'{Fore.RED} {self.abbreviation=} {self.data_type=} {Fore.RESET}')
def __repr__(self):
return str(self.__dict__)
def decode(self, val_bytes):
if self.data_type == 'string':
if isinstance(val_bytes, int):
self.value = ''
else:
val_bytes = val_bytes.bytes
self.value = val_bytes.decode('utf-8')
elif self.data_type == 'enum':
if not isinstance(val_bytes, int):
val_bytes = val_bytes.bytes
i = int.from_bytes(val_bytes, 'big', signed=False)
else:
i = val_bytes
enum_type = enum_table[f'Enum_{self.abbreviation}']
self.value = enum_type(i).name
elif self.data_type == 'float':
if isinstance(val_bytes, int):
self.value = float(val_bytes)
else:
self.value = val_bytes.f
elif self.data_type == 'integer':
self.value = val_bytes.int
elif self.data_type == 'unsigned':
if isinstance(val_bytes, int):
self.value = val_bytes
else:
self.value = int(val_bytes.uint)
else:
log.error(f"Unknown type: {self.data_type}")
return self.value
def canonical_map(self):
try:
d = {}
d['abbreviation'] = self.abbreviation
d['measurement_id'] = self.measurement_id
d['title'] = self.title
d['name'] = self.name
d['description'] = self.description
d['source'] = self.source
d['categories'] = self.categories
d['byte_length'] = self.byte_length
d['data_type'] = self.data_type
d['value'] = self.value
except Exception as e:
log.error(e)
log.error(self.name)
return {}
return d
channel_types = {}
for i in deep_get(q, 'telemetry_dictionary.telemetry_definitions.telemetry'):
#print(i)
m_id = i.get('measurement_id', None)
if not m_id:
continue
measurement_id = int(m_id)
byte_length = int(i['@byte_length'])
#print(i.keys())
d = deep_get(i, 'categories.category')
m = {}
#print(d)
for p in d:
k = p['@name']
v = p['@value']
m[k] = v
#print(f'{m=}')
#break
categories = m
#print(measurement_id)
channel_types[measurement_id] = Monitor_Channel(i['@abbreviation'],
i['@name'],
i['@type'],
i['@source'],
byte_length,
measurement_id,
i['title'],
i['description'].strip("'"),
categories)
"""
A plugin which creates an RAF connection with the DSN.
Frames received via the RAF connection are sent to the output stream
"""
class DSN_Monitor():
def __init__(self):
pass
def process_sdb(self, sdb):
res = {'DDD_HEADER': sdb[0:20],
'SFDU': sdb[20:-2],
'DDD_TRAILER': sdb[-2:None]}
res = self.process_sfdu(res['SFDU'])
return res
def process_sfdu(self, SFDU):
# https://jaguar.jpl.nasa.gov/SW_detail.php?modid=2403
# We're working in 16 bits here
res = {}
SFDU_LABEL = SFDU[0:20]
res['SFDU_LABEL'] = self.process_sfdu_label(SFDU_LABEL)
AGGREGATION_CHDO = BitArray(SFDU[20:24])
res['AGGREGATION_CHDO'] = self.process_chdo_01(AGGREGATION_CHDO)
PRIMARY_CHDO = BitArray(SFDU[24:32])
res['PRIMARY_CHDO'] = self.process_chdo_02(PRIMARY_CHDO)
# ERROR
SECONDARY_CHDO = BitArray(SFDU[32:54])
res['SECONDARY_CHDO'] = self.process_chdo_73(SECONDARY_CHDO)
TERTIARY_CHDO = BitArray(SFDU[54:58])
res['TERTIARY_CHDO'] = self.process_chdo_000(TERTIARY_CHDO)
QUATERNARY_CHDO = BitArray(SFDU[58:68])
res['QUATERNARY_CHDO'] = self.process_chdo_27(QUATERNARY_CHDO)
chd0_index = res['AGGREGATION_CHDO']['CHDO_LENGTH'] + 24 # 68
#print(chd0_index)
DATA_CHDO = BitArray(SFDU[chd0_index:])
#print(DATA_CHDO)
#print(json.dumps(res, indent=4))
res['DATA_CHDO'] = self.process_chdo_28(DATA_CHDO, res['QUATERNARY_CHDO']['NUMBER_CHANNELS'])
#res['DATA'] = None
#print(json.dumps(res, indent=4))
#print(res)
#print(SFDU[1950:])
d = res['DATA_CHDO']['data']
#print(channel_types)
final = []
for channel in d:
#print(channel)
if channel['LC_VALUE']:
val = channel['LC_VALUE']
else:
val = channel['LENGTH_VALUE']
c = channel_types.get(channel['CHANNEL_NUMBER'], None)
if c:
try:
#log.info(f'{channel=}')
# log.info(c)
c.decode(val)
final.append(c.canonical_map())
except Exception as e:
log.error(e)
#log.error(json.dumps(c.canonical_map(), indent=4))
log.error(c)
tb.print_exc()
return final
else:
final[f'M-{channel["CHANNEL_NUMBER"]}'] = "Not in the dictionary."
log.error(f"M-{channel['CHANNEL_NUMBER']} is not in the dictionary.")
#print(json.dumps(final, indent=4))
return final
def process_chdo_28(self, dat, num_channels):
# Channelized Data Area
# https://jaguar.jpl.nasa.gov/SW_detail.php
data_qual = {}
data_qual[0] = "No error, not filler."
data_qual[1] = "The decom map tried to make a channel, but the record had no data at that location."
data_qual[2] = "Filler data was decommutated."
data_qual[3] = "Stale"
res = {}
dat_bytes = dat.bytes
res["CHDO_TYPE"] = int.from_bytes(dat_bytes[0:2], 'big', signed=False)
res["CHDO_LENGTH"] = int.from_bytes(dat_bytes[2:4], 'big', signed=False)
#print(res)
decode = []
dat = BitArray(dat_bytes[4:])
for i in range(1, num_channels):
res2 = {}
#print()
#print(f'{dat=}')
#print(f'{dat[0:5]=}')
res2["SOURCE"] = chr(ord('@')+dat[0:5].uint)
#print(res2['SOURCE'])
res2["LV_FLAG"] = bool(dat[5:6])
res2["DATA_QUAL"] = data_qual[dat[6:7].uint]
res2['LENGTH_VALUE'] = dat[7:16].uint
res2['FILLER_LENGTH'] = dat[16:20].uint
res2['CHANNEL_NUMBER'] = dat[20:32].uint
if not res2['LV_FLAG']:
filler = res2['FILLER_LENGTH']
res2['LC_VALUE'] = (dat[filler+32:(32+(res2['LENGTH_VALUE']*2*8))])
#print(res2['FILLER_LENGTH'], res2['LC_VALUE'])
consume = res2['LENGTH_VALUE']*2*8
else:
res2['LC_VALUE'] = None
consume = 0
#print(f'{consume=}')
decode.append(res2)
#print(f'{res2=}')
dat = BitArray(dat[consume+32:])
consume = 0
res['data'] = decode
return res
def process_chdo_27(self, dat):
# Quarternary
# 10 bytes
dat = dat.bytes
res = {}
res["CHDO_TYPE"] = dat[0:2]
res["CHDO_LENGTH"] = dat[2:4]
res["DECOM_FLAGS"] = dat[4:5]
res["FILLER_LENGTH"] = dat[5:6]
res["NUMBER_CHANNELS"] = dat[6:8]
res = {k: int.from_bytes(v, 'big', signed=False)
for (k, v) in res.items()}
res["MAP_ID"] = dat[8:10].hex()
res["DECOM_FLAGS"] = None # Per Spec
return res
def process_chdo_000(self, dat):
# Tertiary
# 4 Bytes
dat = dat.bytes
res = {}
res['CHDO_TYPE'] = dat[0:2]
res['CHDO_LENGTH'] = dat[2:4]
res = {k: int.from_bytes(v, 'big', signed=False)
for (k, v) in res.items()}
return res
def process_chdo_73(self, dat):
# Secondary
# 22 Bytes
dat = dat.bytes
res = {}
res["CHDO_TYPE"] = dat[0:2]
res["CHDO_LENGTH"] = dat[2:4]
res["ORIGINATOR"] = dat[4:5]
res["LAST_MODIFIER"] = dat[5:6]
res["8B_SCFT_ID"] = dat[6:7]
res["DATA_SOURCE"] = dat[7:8]
res["SCFT_ID"] = dat[8:10]
res["MST"] = dat[10:16]
res["SPARE"] = dat[17:18]
res["SCFT_ID"] = dat[18:20]
res["SPARE_2"] = dat[20:22]
res = {k: int.from_bytes(v, 'big', signed=False)
for (k, v) in res.items()}
return res
def process_chdo_02(self, dat):
# Primary
# 8 bytes
dat = dat.bytes
res = {}
res['CHDO_TYPE'] = int.from_bytes(dat[0:2], 'big', signed=False)
res['CHDO_LENGTH'] = int.from_bytes(dat[2:4], 'big', signed=False)
RECORD_ID = dat[4:8]
res['MAJOR'] = RECORD_ID[0]
res['MINOR'] = RECORD_ID[1]
res['MISSION'] = RECORD_ID[2]
res['FORMAT'] = RECORD_ID[3]
return res
def process_chdo_01(self, dat):
# Aggregation
# 4 Bytes
dat = dat.bytes
res = {}
res["CHDO_TYPE"] = int.from_bytes(dat[0:2], 'big', signed=False)
res["CHDO_LENGTH"] = int.from_bytes(dat[2:4], 'big', signed=False)
#res["CHDO_VALUE"] = dat[4:]
return res
def process_sfdu_label(self, dat):
# https://jaguar.jpl.nasa.gov/SW_detail.php?modid=2137
res = {}
res["CONTROL_AUTHORITY"] = dat[0:4].decode("ASCII")
res["VERSION_ID"] = dat[4:5].decode("ASCII")
res["CLASS_ID"] = dat[5:6].decode("ASCII")
res["SPARE"] = dat[6:8].decode("ASCII")
res["DATA_DESCRIPTION_ID"] = dat[8:12].decode("ASCII")
res['LENGTH_ATTRIBUTE'] = int.from_bytes(dat[12:20], 'big', signed=False)
return res
class DSN_Monitor_Plugin(Plugin):
def __init__(self, inputs=None, outputs=None,
zmq_args=None, report_time_s=0, host='0.0.0.0', port=8001, **kwargs):
super().__init__(inputs, outputs, zmq_args)
self.receive_counter = 0
self.host = host
self.port = port
self.processor = DSN_Monitor()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
log.info(f'Starting station monitor server on: {self.host}:{self.port}')
s.bind((self.host, self.port))
## Pack these up in pytest
#with open('/home/jpluser/SunRISE-AIT/dss-54.mon0158.sfdus', 'rb') as f:
# x = f.read()
# self.process(x)
# with open('/home/vicious/AIT-Deploy/SunRISE-AIT/dump.ndjson', 'r') as f:
# while x := f.readline():
# x = json.loads(x)
# x = bytearray(ast.literal_eval(x))
# #print(x, type(x))
# self.process(x)
while True:
data, _ = s.recvfrom(60000)
self.process(data)
def process(self, data, topic=None):
res = self.processor.process_sdb(bytearray(data))
# Print out the data for a quick inspection
#log.info(json.dumps(res, indent=4))
self.publish(res, 'STATION_MONITOR_DATA')
version: "3.3"
services:
influx:
image: influxdb:latest
container_name: influx
networks: ["bifrost-network"]
environment:
# Be sure to match the equivalent values in the bifrost.env file
# TIP to recover tokens: docker exec influx influx auth list | awk '/$DOCKER_INFLUXDB_INIT_USERNAME/ {print $4 " "}'
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=rootroot
- DOCKER_INFLUXDB_INIT_PASSWORD=rootroot
- DOCKER_INFLUXDB_INIT_ORG=Mithril
- DOCKER_INFLUXDB_INIT_BUCKET=Tuatha_de_Danaan
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=AtoTallYLegiTKey
ports:
- 8086:8086
volumes:
- /gds/influx/data:/var/lib/influxdb2:z
- /gds/influx/config:/etc/influxdb2:z
healthcheck:
test: "curl -f influx:8086/"
interval: 2s
timeout: 5s
retries: 5
import ait.core # noqa
from ait.core import log, tlm
from ait.core.server.plugin import Plugin
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
bucket="LunarTrailBlazer"
sv_name=bucket
pass_number=1
class Influx(Plugin):
def __init__(
self, inputs, outputs, host_url, api_token, org, **kwargs):
inputs.append("STATION_MONITOR_DATA")
super().__init__(inputs, outputs, **kwargs)
try:
self.client = InfluxDBClient(url=host_url,
token=api_token,
org=org)
def process(self, message, topic=None, **kwargs):
def process_station_monitor(message):
d = []
for i in message:
#print(i)
m = {}
m['measurement'] = i['abbreviation']
m['fields'] = {i['title']: i['value'],
'description': i['description']}
m['tags'] = {'sv_name': str(sv_name),
'pass_number': str(pass_number),
'user': 'DSN',
'ops_category': i['categories']['ops category'],
'subsystem': i['categories']['subsystem'],
'title': i['title']}
d.append(m)
# LTB May gain performance from an async client
# instead due to python not using real threads
with self.client.write_api(write_options=SYNCHRONOUS) as f:
f.write(bucket=f'{bucket}_DSN', record=d)
if topic == 'STATION_MONITOR_DATA':
process_station_monitor(message)
else:
process_telemetry(message)

Finalize Influx setup

You’ll need to deploy Influx2 and create an API token to pass into the Influx2 plugin. This is simple to do using the influx2 web gui.

Missing dictionary

The channel_monitor.xml is a dictionary we’ll send you through LFT.

GDS Side Testing

Check the connection (UDP) to the SFGI at the DSN using netcat, you’ll see data come over at 5 Hz. The DSN will need to open up firewalls. The DSN will also need to flow the data through the SFGI, they should have station monitor data set they can flow for testing purposes; it’s not mission specific at all.

Using the 0158 Data

You should be able to do historical queries and make notebooks, dashboards, etc… using the Influx 2 web gui. You can also use influx to pull the data from some other visualization software (e.g. Grafana), but SunRISE didn’t need (try) to do this.

@Mejiro-McQueen
Copy link
Author

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment