Last active
April 11, 2019 12:09
-
-
Save dragonlost/137ec8595d933fad01eb152fc39f0d64 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Created on Fri Apr 6 11:49:46 2018 | |
@author: Sébastien Durand | |
""" | |
import numpy as np | |
from datetime import datetime | |
from tqdm import tqdm | |
from astropy import units as u | |
from astropy.time import Time | |
#////////////////////////////////////////// Output files creation ///////////////////////////////////////////// | |
def write_ccsds(sat_QUAT_path_out, cic, origin, comment, header, sequence, data_time, data_science): | |
""" | |
Read CIC CCSDS standars file | |
Input : | |
sat_QUAT_path_out (string) : Output path for new CIC file | |
cic (string) : CIC string Line | |
origin (string) : String origin Name Line | |
comment (string array) : String List of Comment, line by line | |
header (string array) : String List of other line | |
sequence (string array) : String List for defind writing formats | |
(ex: for time + 3 data: ["%d","%.6f","%.8f","%.8f","%.8f"]) | |
data_time (Nx2, float) : All Time, julian day and second | |
data_science (NxM, float) : All Data, M x data | |
""" | |
if(len(data_time) == len(data_science)): | |
data = [] | |
for i in range(len(data_time)): | |
data.append(np.concatenate((data_time[i],data_science[i]))) | |
if (len(data[0]) == len(sequence)): | |
fO = open(sat_QUAT_path_out,'w') | |
# creation header : | |
fO.write(cic+"\n") | |
fO.write(datetime.now().strftime("CREATION_DATE = %Y-%m-%dT%H:%M:%S.%f")[:-3]+"\n") | |
fO.write(origin+"\n") | |
fO.write("\n"+"META_START"+"\n"+"\n") | |
for i in range(len(comment)):fO.write(comment[i]+"\n") | |
fO.write("\n") | |
for i in range(len(header)):fO.write(header[i]+"\n") | |
fO.write("\n"+"META_STOP"+"\n"+"\n") | |
for i in range(len(data_science)): | |
np.savetxt(fO,[data[i]],delimiter="\t",fmt=sequence) | |
fO.close() | |
else: | |
print("Data_time + data_science size :",(len(data_time[0])+len(data_science[0]))) | |
print("sequence size :",len(sequence)) | |
raise IndexError("Data_time + data_science size is not the same size as sequence") | |
else: | |
print("Data_time size :",len(data_time)) | |
print("Data Science size :",len(data_science)) | |
raise IndexError("Data_time and data_science have not the same size") | |
def read_ccsds(sat_QUAT_path_in, separator = '\t'): | |
""" | |
Read CIC CCSDS standars file | |
Input : | |
sat_QUAT_path_in (string) : Path to Input CIC File | |
Output : | |
cic (string) : CIC string Line | |
origin (string) : String origin Name Line | |
comment (string array) : String List of Comment, line by line | |
header (string array) : String List of other line | |
data_time (Nx2, float) : All Time, julian day and second | |
data_science (NxM, float) : All Data, M x data | |
""" | |
fM = open(sat_QUAT_path_in,'r') | |
meta_stop = 'META_STOP' | |
text = fM.readlines() | |
text = np.array(text) | |
for i in range(text.size) : text[i]=text[i].strip() | |
text_filter = [] | |
for i in range(text.size) : | |
if text[i] != '': | |
text_filter.append(text[i]) | |
text_filter=np.array(text_filter) | |
#start = np.where(text_filter == meta_stop)[0] | |
for i in range(text_filter.size): | |
if(text_filter[i] == meta_stop): | |
stop = i | |
break | |
if not('stop' in locals()): | |
raise AttributeError("META_STOP is not detect") | |
data = text_filter[stop+1 :] | |
header = text_filter[0:stop] | |
comment=[] | |
for i in tqdm(range(header.size), ascii=True, desc="Read Header"): | |
if(header[i].startswith('COMMENT')): | |
comment.append(header[i]) | |
if(header[i].startswith("CIC_AEM_VERS")): | |
cic = header[i] | |
cictype = "attitude" | |
elif(header[i].startswith("CIC_OEM_VERS")): | |
cic = header[i] | |
cictype = "orbite" | |
elif(header[i].startswith("CCSDS_OEM_VERS")): | |
cic = header[i] | |
cictype = "orbite_ccsds" | |
elif(header[i].startswith("CCSDS_AEM_VERS")): | |
cic = header[i] | |
cictype = "attitude_ccsds" | |
elif(header[i].startswith("CIC_MEM_VERS")): | |
cic = header[i] | |
cictype = "event_cic" | |
if(header[i].startswith("ORIGINATOR")): | |
origin = header[i] | |
if(header[i].startswith("META_START")): | |
start = i | |
if not('start' in locals()): | |
raise AttributeError("META_START is not detect") | |
elif not('cic' in locals()): | |
raise AttributeError("CIC_OEM_VERS or CIC_AEM_VERS or CCSDS_AEM_VERS or CCSDS_OEM_VERS or CIC_MEM_VERS is not detect") | |
elif not('origin' in locals()): | |
raise AttributeError("ORIGINATOR is not detect") | |
header = header[start+1+len(comment):] | |
data_sp = [] | |
if cictype == "attitude_ccsds" or cictype == "orbite_ccsds" or cictype == "orbite" or cictype == "attitude": | |
for i in tqdm(data, ascii=True, desc="Read DATA File"): data_sp.append(np.float32(i.split(separator))) | |
elif cictype == "event_cic": | |
for i in tqdm(data, ascii=True, desc="Read DATA File"): data_sp.append(i.split(separator)) | |
data_sp = np.array(data_sp) | |
fM.close() | |
data_science = data_sp[:,2:] | |
data_time = data_sp[:,:2] | |
return cic, origin, comment, header, data_time, data_science | |
def read_miriade_eph_file(file_path, coord_type="Rectangular", ref_plane="Equator", Time_type="utc"): | |
# need heliocentrique coord | |
fM = open(file_path,'r') | |
number_line_header=5 | |
text = fM.readlines() | |
text = np.array(text) | |
header = text[0:number_line_header-1] | |
data_text = text[number_line_header:] | |
for i in range(data_text.size) : data_text[i]=data_text[i].strip() | |
fM.close() | |
data_text_filter = [] | |
for i in range(data_text.size) : | |
if data_text[i] != '': | |
data_text_filter.append(data_text[i]) | |
data_text_filter=np.array(data_text_filter) | |
data_split = [] | |
for i in tqdm(data_text_filter, ascii=True, desc="Read DATA File"): data_split.append(i.split(", ")) | |
data_split = np.array(data_split) | |
data_split=data_split.T | |
data_time = data_split[1] | |
data_science = data_split[2:] | |
col = [] | |
col.append(["Target", "name"]) | |
col.append(["Date", "ISOT"]) | |
if coord_type == "Spherical": | |
col.append(["RA", ["hour", "minute", "second"]]) | |
col.append(["DEC", ["day", "minute", "second"]]) | |
col.append(["Distance", "AU"]) | |
elif coord_type == "Rectangular": | |
col.append(["X", "AU"]) | |
col.append(["Y", "AU"]) | |
col.append(["Z", "AU"]) | |
col.append(["Xp", "AU/day"]) | |
col.append(["Yp", "AU/day"]) | |
col.append(["Zp", "AU/day"]) | |
col.append(["Distance", "AU"]) | |
else: | |
raise ValueError("This Coord__type is not support, juste Spherical and Rectangular") | |
print(col) | |
new_data = [] | |
k=0 | |
for i in range(2,len(col)): | |
if type(col[i][1]) == list: | |
for j in range(len(col[i][1])): | |
new_data.append(np.float32(data_science[k])*u.Unit(col[i][1][j])) | |
k=k+1 | |
else: | |
new_data.append(np.float32(data_science[k])*u.Unit(col[i][1])) | |
k=k+1 | |
data_science = new_data | |
data_time = Time(data_time, format="isot", scale=Time_type) | |
return data_time, data_science, header | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment