Skip to content

Instantly share code, notes, and snippets.

@dragonlost
Last active April 11, 2019 12:09
Show Gist options
  • Save dragonlost/137ec8595d933fad01eb152fc39f0d64 to your computer and use it in GitHub Desktop.
Save dragonlost/137ec8595d933fad01eb152fc39f0d64 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Apr 6 11:49:46 2018
@author: Sébastien Durand
"""
import numpy as np
from datetime import datetime
from tqdm import tqdm
from astropy import units as u
from astropy.time import Time
#////////////////////////////////////////// Output files creation /////////////////////////////////////////////
def write_ccsds(sat_QUAT_path_out, cic, origin, comment, header, sequence, data_time, data_science):
"""
Read CIC CCSDS standars file
Input :
sat_QUAT_path_out (string) : Output path for new CIC file
cic (string) : CIC string Line
origin (string) : String origin Name Line
comment (string array) : String List of Comment, line by line
header (string array) : String List of other line
sequence (string array) : String List for defind writing formats
(ex: for time + 3 data: ["%d","%.6f","%.8f","%.8f","%.8f"])
data_time (Nx2, float) : All Time, julian day and second
data_science (NxM, float) : All Data, M x data
"""
if(len(data_time) == len(data_science)):
data = []
for i in range(len(data_time)):
data.append(np.concatenate((data_time[i],data_science[i])))
if (len(data[0]) == len(sequence)):
fO = open(sat_QUAT_path_out,'w')
# creation header :
fO.write(cic+"\n")
fO.write(datetime.now().strftime("CREATION_DATE = %Y-%m-%dT%H:%M:%S.%f")[:-3]+"\n")
fO.write(origin+"\n")
fO.write("\n"+"META_START"+"\n"+"\n")
for i in range(len(comment)):fO.write(comment[i]+"\n")
fO.write("\n")
for i in range(len(header)):fO.write(header[i]+"\n")
fO.write("\n"+"META_STOP"+"\n"+"\n")
for i in range(len(data_science)):
np.savetxt(fO,[data[i]],delimiter="\t",fmt=sequence)
fO.close()
else:
print("Data_time + data_science size :",(len(data_time[0])+len(data_science[0])))
print("sequence size :",len(sequence))
raise IndexError("Data_time + data_science size is not the same size as sequence")
else:
print("Data_time size :",len(data_time))
print("Data Science size :",len(data_science))
raise IndexError("Data_time and data_science have not the same size")
def read_ccsds(sat_QUAT_path_in, separator = '\t'):
"""
Read CIC CCSDS standars file
Input :
sat_QUAT_path_in (string) : Path to Input CIC File
Output :
cic (string) : CIC string Line
origin (string) : String origin Name Line
comment (string array) : String List of Comment, line by line
header (string array) : String List of other line
data_time (Nx2, float) : All Time, julian day and second
data_science (NxM, float) : All Data, M x data
"""
fM = open(sat_QUAT_path_in,'r')
meta_stop = 'META_STOP'
text = fM.readlines()
text = np.array(text)
for i in range(text.size) : text[i]=text[i].strip()
text_filter = []
for i in range(text.size) :
if text[i] != '':
text_filter.append(text[i])
text_filter=np.array(text_filter)
#start = np.where(text_filter == meta_stop)[0]
for i in range(text_filter.size):
if(text_filter[i] == meta_stop):
stop = i
break
if not('stop' in locals()):
raise AttributeError("META_STOP is not detect")
data = text_filter[stop+1 :]
header = text_filter[0:stop]
comment=[]
for i in tqdm(range(header.size), ascii=True, desc="Read Header"):
if(header[i].startswith('COMMENT')):
comment.append(header[i])
if(header[i].startswith("CIC_AEM_VERS")):
cic = header[i]
cictype = "attitude"
elif(header[i].startswith("CIC_OEM_VERS")):
cic = header[i]
cictype = "orbite"
elif(header[i].startswith("CCSDS_OEM_VERS")):
cic = header[i]
cictype = "orbite_ccsds"
elif(header[i].startswith("CCSDS_AEM_VERS")):
cic = header[i]
cictype = "attitude_ccsds"
elif(header[i].startswith("CIC_MEM_VERS")):
cic = header[i]
cictype = "event_cic"
if(header[i].startswith("ORIGINATOR")):
origin = header[i]
if(header[i].startswith("META_START")):
start = i
if not('start' in locals()):
raise AttributeError("META_START is not detect")
elif not('cic' in locals()):
raise AttributeError("CIC_OEM_VERS or CIC_AEM_VERS or CCSDS_AEM_VERS or CCSDS_OEM_VERS or CIC_MEM_VERS is not detect")
elif not('origin' in locals()):
raise AttributeError("ORIGINATOR is not detect")
header = header[start+1+len(comment):]
data_sp = []
if cictype == "attitude_ccsds" or cictype == "orbite_ccsds" or cictype == "orbite" or cictype == "attitude":
for i in tqdm(data, ascii=True, desc="Read DATA File"): data_sp.append(np.float32(i.split(separator)))
elif cictype == "event_cic":
for i in tqdm(data, ascii=True, desc="Read DATA File"): data_sp.append(i.split(separator))
data_sp = np.array(data_sp)
fM.close()
data_science = data_sp[:,2:]
data_time = data_sp[:,:2]
return cic, origin, comment, header, data_time, data_science
def read_miriade_eph_file(file_path, coord_type="Rectangular", ref_plane="Equator", Time_type="utc"):
# need heliocentrique coord
fM = open(file_path,'r')
number_line_header=5
text = fM.readlines()
text = np.array(text)
header = text[0:number_line_header-1]
data_text = text[number_line_header:]
for i in range(data_text.size) : data_text[i]=data_text[i].strip()
fM.close()
data_text_filter = []
for i in range(data_text.size) :
if data_text[i] != '':
data_text_filter.append(data_text[i])
data_text_filter=np.array(data_text_filter)
data_split = []
for i in tqdm(data_text_filter, ascii=True, desc="Read DATA File"): data_split.append(i.split(", "))
data_split = np.array(data_split)
data_split=data_split.T
data_time = data_split[1]
data_science = data_split[2:]
col = []
col.append(["Target", "name"])
col.append(["Date", "ISOT"])
if coord_type == "Spherical":
col.append(["RA", ["hour", "minute", "second"]])
col.append(["DEC", ["day", "minute", "second"]])
col.append(["Distance", "AU"])
elif coord_type == "Rectangular":
col.append(["X", "AU"])
col.append(["Y", "AU"])
col.append(["Z", "AU"])
col.append(["Xp", "AU/day"])
col.append(["Yp", "AU/day"])
col.append(["Zp", "AU/day"])
col.append(["Distance", "AU"])
else:
raise ValueError("This Coord__type is not support, juste Spherical and Rectangular")
print(col)
new_data = []
k=0
for i in range(2,len(col)):
if type(col[i][1]) == list:
for j in range(len(col[i][1])):
new_data.append(np.float32(data_science[k])*u.Unit(col[i][1][j]))
k=k+1
else:
new_data.append(np.float32(data_science[k])*u.Unit(col[i][1]))
k=k+1
data_science = new_data
data_time = Time(data_time, format="isot", scale=Time_type)
return data_time, data_science, header
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment