|
import ait.core |
|
from ait.core.server.plugins import Plugin |
|
from ait.core.message_types import MessageType |
|
from ait.core import log |
|
import socket |
|
from bitstring import BitArray |
|
from enum import Enum |
|
import json |
|
import xmltodict |
|
from functools import reduce |
|
import ast |
|
import traceback as tb |
|
from colorama import Fore |
|
|
|
|
|
def deep_get(dictionary, keys, default=None): |
|
return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary) |
|
|
|
|
|
with open('.//SunRISE-AIT/sunrise/monitor_channel.xml', 'rb') as f: |
|
q = xmltodict.parse(f) |
|
|
|
|
|
enum_table = {} |
|
for i in deep_get(q, 'telemetry_dictionary.enum_definitions.enum_table'): |
|
d = {} |
|
for j in deep_get(i, 'values.enum'): |
|
if isinstance(j, str): |
|
j = {'@symbol': 'None', '@numeric': '0'} |
|
d[j['@symbol']] = int(j['@numeric']) |
|
e = Enum(i['@name'], d) |
|
enum_table[e.__name__] = e |
|
|
|
|
|
class Monitor_Channel: |
|
def __init__(self, abbreviation, name, data_type, source, |
|
byte_length, measurement_id, |
|
title, description, categories): |
|
|
|
self.value = None |
|
self.abbreviation = abbreviation |
|
self.name = name |
|
self.data_type = data_type |
|
self.source = source |
|
self.byte_length = byte_length |
|
self.measurement_id = measurement_id |
|
self.title = title |
|
self.description = description |
|
self.categories = categories |
|
#log.error(f'{Fore.RED} {self.abbreviation=} {self.data_type=} {Fore.RESET}') |
|
|
|
def __repr__(self): |
|
return str(self.__dict__) |
|
|
|
def decode(self, val_bytes): |
|
|
|
if self.data_type == 'string': |
|
if isinstance(val_bytes, int): |
|
self.value = '' |
|
else: |
|
val_bytes = val_bytes.bytes |
|
self.value = val_bytes.decode('utf-8') |
|
elif self.data_type == 'enum': |
|
if not isinstance(val_bytes, int): |
|
val_bytes = val_bytes.bytes |
|
i = int.from_bytes(val_bytes, 'big', signed=False) |
|
else: |
|
i = val_bytes |
|
enum_type = enum_table[f'Enum_{self.abbreviation}'] |
|
self.value = enum_type(i).name |
|
elif self.data_type == 'float': |
|
if isinstance(val_bytes, int): |
|
self.value = float(val_bytes) |
|
else: |
|
self.value = val_bytes.f |
|
elif self.data_type == 'integer': |
|
self.value = val_bytes.int |
|
elif self.data_type == 'unsigned': |
|
if isinstance(val_bytes, int): |
|
self.value = val_bytes |
|
else: |
|
self.value = int(val_bytes.uint) |
|
else: |
|
log.error(f"Unknown type: {self.data_type}") |
|
|
|
return self.value |
|
|
|
def canonical_map(self): |
|
try: |
|
d = {} |
|
d['abbreviation'] = self.abbreviation |
|
d['measurement_id'] = self.measurement_id |
|
d['title'] = self.title |
|
d['name'] = self.name |
|
d['description'] = self.description |
|
d['source'] = self.source |
|
d['categories'] = self.categories |
|
d['byte_length'] = self.byte_length |
|
d['data_type'] = self.data_type |
|
d['value'] = self.value |
|
except Exception as e: |
|
log.error(e) |
|
log.error(self.name) |
|
return {} |
|
|
|
return d |
|
|
|
|
|
channel_types = {} |
|
for i in deep_get(q, 'telemetry_dictionary.telemetry_definitions.telemetry'): |
|
#print(i) |
|
m_id = i.get('measurement_id', None) |
|
if not m_id: |
|
continue |
|
measurement_id = int(m_id) |
|
byte_length = int(i['@byte_length']) |
|
#print(i.keys()) |
|
d = deep_get(i, 'categories.category') |
|
m = {} |
|
#print(d) |
|
for p in d: |
|
k = p['@name'] |
|
v = p['@value'] |
|
m[k] = v |
|
#print(f'{m=}') |
|
#break |
|
categories = m |
|
#print(measurement_id) |
|
channel_types[measurement_id] = Monitor_Channel(i['@abbreviation'], |
|
i['@name'], |
|
i['@type'], |
|
i['@source'], |
|
byte_length, |
|
measurement_id, |
|
i['title'], |
|
i['description'].strip("'"), |
|
categories) |
|
|
|
""" |
|
A plugin which creates an RAF connection with the DSN. |
|
Frames received via the RAF connection are sent to the output stream |
|
""" |
|
|
|
|
|
class DSN_Monitor(): |
|
def __init__(self): |
|
pass |
|
|
|
def process_sdb(self, sdb): |
|
res = {'DDD_HEADER': sdb[0:20], |
|
'SFDU': sdb[20:-2], |
|
'DDD_TRAILER': sdb[-2:None]} |
|
res = self.process_sfdu(res['SFDU']) |
|
return res |
|
|
|
def process_sfdu(self, SFDU): |
|
# https://jaguar.jpl.nasa.gov/SW_detail.php?modid=2403 |
|
|
|
# We're working in 16 bits here |
|
res = {} |
|
SFDU_LABEL = SFDU[0:20] |
|
res['SFDU_LABEL'] = self.process_sfdu_label(SFDU_LABEL) |
|
|
|
AGGREGATION_CHDO = BitArray(SFDU[20:24]) |
|
res['AGGREGATION_CHDO'] = self.process_chdo_01(AGGREGATION_CHDO) |
|
|
|
PRIMARY_CHDO = BitArray(SFDU[24:32]) |
|
res['PRIMARY_CHDO'] = self.process_chdo_02(PRIMARY_CHDO) |
|
|
|
# ERROR |
|
SECONDARY_CHDO = BitArray(SFDU[32:54]) |
|
res['SECONDARY_CHDO'] = self.process_chdo_73(SECONDARY_CHDO) |
|
|
|
TERTIARY_CHDO = BitArray(SFDU[54:58]) |
|
res['TERTIARY_CHDO'] = self.process_chdo_000(TERTIARY_CHDO) |
|
|
|
QUATERNARY_CHDO = BitArray(SFDU[58:68]) |
|
res['QUATERNARY_CHDO'] = self.process_chdo_27(QUATERNARY_CHDO) |
|
|
|
chd0_index = res['AGGREGATION_CHDO']['CHDO_LENGTH'] + 24 # 68 |
|
#print(chd0_index) |
|
DATA_CHDO = BitArray(SFDU[chd0_index:]) |
|
#print(DATA_CHDO) |
|
#print(json.dumps(res, indent=4)) |
|
res['DATA_CHDO'] = self.process_chdo_28(DATA_CHDO, res['QUATERNARY_CHDO']['NUMBER_CHANNELS']) |
|
#res['DATA'] = None |
|
#print(json.dumps(res, indent=4)) |
|
#print(res) |
|
#print(SFDU[1950:]) |
|
|
|
d = res['DATA_CHDO']['data'] |
|
#print(channel_types) |
|
final = [] |
|
for channel in d: |
|
#print(channel) |
|
if channel['LC_VALUE']: |
|
val = channel['LC_VALUE'] |
|
else: |
|
val = channel['LENGTH_VALUE'] |
|
c = channel_types.get(channel['CHANNEL_NUMBER'], None) |
|
if c: |
|
try: |
|
#log.info(f'{channel=}') |
|
# log.info(c) |
|
c.decode(val) |
|
final.append(c.canonical_map()) |
|
except Exception as e: |
|
log.error(e) |
|
#log.error(json.dumps(c.canonical_map(), indent=4)) |
|
log.error(c) |
|
tb.print_exc() |
|
return final |
|
else: |
|
final[f'M-{channel["CHANNEL_NUMBER"]}'] = "Not in the dictionary." |
|
log.error(f"M-{channel['CHANNEL_NUMBER']} is not in the dictionary.") |
|
#print(json.dumps(final, indent=4)) |
|
return final |
|
|
|
def process_chdo_28(self, dat, num_channels): |
|
# Channelized Data Area |
|
# https://jaguar.jpl.nasa.gov/SW_detail.php |
|
data_qual = {} |
|
data_qual[0] = "No error, not filler." |
|
data_qual[1] = "The decom map tried to make a channel, but the record had no data at that location." |
|
data_qual[2] = "Filler data was decommutated." |
|
data_qual[3] = "Stale" |
|
|
|
res = {} |
|
dat_bytes = dat.bytes |
|
res["CHDO_TYPE"] = int.from_bytes(dat_bytes[0:2], 'big', signed=False) |
|
res["CHDO_LENGTH"] = int.from_bytes(dat_bytes[2:4], 'big', signed=False) |
|
|
|
#print(res) |
|
decode = [] |
|
dat = BitArray(dat_bytes[4:]) |
|
for i in range(1, num_channels): |
|
res2 = {} |
|
#print() |
|
#print(f'{dat=}') |
|
#print(f'{dat[0:5]=}') |
|
res2["SOURCE"] = chr(ord('@')+dat[0:5].uint) |
|
#print(res2['SOURCE']) |
|
res2["LV_FLAG"] = bool(dat[5:6]) |
|
res2["DATA_QUAL"] = data_qual[dat[6:7].uint] |
|
res2['LENGTH_VALUE'] = dat[7:16].uint |
|
res2['FILLER_LENGTH'] = dat[16:20].uint |
|
res2['CHANNEL_NUMBER'] = dat[20:32].uint |
|
if not res2['LV_FLAG']: |
|
filler = res2['FILLER_LENGTH'] |
|
res2['LC_VALUE'] = (dat[filler+32:(32+(res2['LENGTH_VALUE']*2*8))]) |
|
#print(res2['FILLER_LENGTH'], res2['LC_VALUE']) |
|
consume = res2['LENGTH_VALUE']*2*8 |
|
else: |
|
res2['LC_VALUE'] = None |
|
consume = 0 |
|
#print(f'{consume=}') |
|
decode.append(res2) |
|
#print(f'{res2=}') |
|
dat = BitArray(dat[consume+32:]) |
|
consume = 0 |
|
res['data'] = decode |
|
return res |
|
|
|
def process_chdo_27(self, dat): |
|
# Quarternary |
|
# 10 bytes |
|
dat = dat.bytes |
|
res = {} |
|
res["CHDO_TYPE"] = dat[0:2] |
|
res["CHDO_LENGTH"] = dat[2:4] |
|
res["DECOM_FLAGS"] = dat[4:5] |
|
res["FILLER_LENGTH"] = dat[5:6] |
|
res["NUMBER_CHANNELS"] = dat[6:8] |
|
|
|
res = {k: int.from_bytes(v, 'big', signed=False) |
|
for (k, v) in res.items()} |
|
res["MAP_ID"] = dat[8:10].hex() |
|
res["DECOM_FLAGS"] = None # Per Spec |
|
return res |
|
|
|
def process_chdo_000(self, dat): |
|
# Tertiary |
|
# 4 Bytes |
|
dat = dat.bytes |
|
res = {} |
|
res['CHDO_TYPE'] = dat[0:2] |
|
res['CHDO_LENGTH'] = dat[2:4] |
|
res = {k: int.from_bytes(v, 'big', signed=False) |
|
for (k, v) in res.items()} |
|
return res |
|
|
|
def process_chdo_73(self, dat): |
|
# Secondary |
|
# 22 Bytes |
|
dat = dat.bytes |
|
res = {} |
|
res["CHDO_TYPE"] = dat[0:2] |
|
res["CHDO_LENGTH"] = dat[2:4] |
|
res["ORIGINATOR"] = dat[4:5] |
|
res["LAST_MODIFIER"] = dat[5:6] |
|
res["8B_SCFT_ID"] = dat[6:7] |
|
res["DATA_SOURCE"] = dat[7:8] |
|
res["SCFT_ID"] = dat[8:10] |
|
res["MST"] = dat[10:16] |
|
res["SPARE"] = dat[17:18] |
|
res["SCFT_ID"] = dat[18:20] |
|
res["SPARE_2"] = dat[20:22] |
|
|
|
res = {k: int.from_bytes(v, 'big', signed=False) |
|
for (k, v) in res.items()} |
|
return res |
|
|
|
def process_chdo_02(self, dat): |
|
# Primary |
|
# 8 bytes |
|
dat = dat.bytes |
|
res = {} |
|
res['CHDO_TYPE'] = int.from_bytes(dat[0:2], 'big', signed=False) |
|
res['CHDO_LENGTH'] = int.from_bytes(dat[2:4], 'big', signed=False) |
|
RECORD_ID = dat[4:8] |
|
res['MAJOR'] = RECORD_ID[0] |
|
res['MINOR'] = RECORD_ID[1] |
|
res['MISSION'] = RECORD_ID[2] |
|
res['FORMAT'] = RECORD_ID[3] |
|
return res |
|
|
|
def process_chdo_01(self, dat): |
|
# Aggregation |
|
# 4 Bytes |
|
dat = dat.bytes |
|
res = {} |
|
res["CHDO_TYPE"] = int.from_bytes(dat[0:2], 'big', signed=False) |
|
res["CHDO_LENGTH"] = int.from_bytes(dat[2:4], 'big', signed=False) |
|
#res["CHDO_VALUE"] = dat[4:] |
|
return res |
|
|
|
def process_sfdu_label(self, dat): |
|
# https://jaguar.jpl.nasa.gov/SW_detail.php?modid=2137 |
|
res = {} |
|
res["CONTROL_AUTHORITY"] = dat[0:4].decode("ASCII") |
|
res["VERSION_ID"] = dat[4:5].decode("ASCII") |
|
res["CLASS_ID"] = dat[5:6].decode("ASCII") |
|
res["SPARE"] = dat[6:8].decode("ASCII") |
|
res["DATA_DESCRIPTION_ID"] = dat[8:12].decode("ASCII") |
|
res['LENGTH_ATTRIBUTE'] = int.from_bytes(dat[12:20], 'big', signed=False) |
|
return res |
|
|
|
|
|
class DSN_Monitor_Plugin(Plugin): |
|
def __init__(self, inputs=None, outputs=None, |
|
zmq_args=None, report_time_s=0, host='0.0.0.0', port=8001, **kwargs): |
|
super().__init__(inputs, outputs, zmq_args) |
|
|
|
self.receive_counter = 0 |
|
self.host = host |
|
self.port = port |
|
self.processor = DSN_Monitor() |
|
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
log.info(f'Starting station monitor server on: {self.host}:{self.port}') |
|
s.bind((self.host, self.port)) |
|
|
|
## Pack these up in pytest |
|
#with open('/home/jpluser/SunRISE-AIT/dss-54.mon0158.sfdus', 'rb') as f: |
|
# x = f.read() |
|
# self.process(x) |
|
|
|
# with open('/home/vicious/AIT-Deploy/SunRISE-AIT/dump.ndjson', 'r') as f: |
|
# while x := f.readline(): |
|
# x = json.loads(x) |
|
# x = bytearray(ast.literal_eval(x)) |
|
# #print(x, type(x)) |
|
# self.process(x) |
|
|
|
while True: |
|
data, _ = s.recvfrom(60000) |
|
self.process(data) |
|
|
|
def process(self, data, topic=None): |
|
res = self.processor.process_sdb(bytearray(data)) |
|
# Print out the data for a quick inspection |
|
#log.info(json.dumps(res, indent=4)) |
|
self.publish(res, 'STATION_MONITOR_DATA') |