|
#!/usr/bin/env python |
|
# encoding: utf-8 |
|
""" |
|
*Stitch together SOXS simulated FITS headers and data generated from the E2E simulator* |
|
|
|
:Author: |
|
David Young |
|
|
|
:Date Created: |
|
June 1, 2021 |
|
|
|
Usage: |
|
soxssim <arm> <type> [-t <tag>] [<exptime>] -i <simulatedDataRoot> -o <outputPath> |
|
|
|
Options: |
|
-h, --help show this help message |
|
-i <simulatedDataRoot>, --input <simulatedDataRoot> path to the root directory of the simulated data (containing `data` and `headers` directories) |
|
-o <outputPath>, --output <outputPath> path to directory to output the requested frames to |
|
-t <tag>, --tag <tag> file tags (e.g. CR == cosmic rays) |
|
""" |
|
################# GLOBAL IMPORTS #################### |
|
import sys |
|
import os |
|
from fundamentals import tools |
|
import pandas as pd |
|
from tabulate import tabulate |
|
from astropy.io import fits |
|
from os.path import expanduser |
|
from ccdproc import ImageFileCollection |
|
from datetime import datetime, date, time, timedelta |
|
from astrocalc.times import conversions |
|
|
|
|
|
def main(arguments=None): |
|
""" |
|
*The main function used when ``soxssim.py`` is run as a single script from the cl* |
|
""" |
|
|
|
# SETUP THE COMMAND-LINE UTIL SETTINGS |
|
su = tools( |
|
arguments=arguments, |
|
docString=__doc__, |
|
logLevel="WARNING", |
|
options_first=False, |
|
projectName=False |
|
) |
|
arguments, settings, log, dbConn = su.setup() |
|
|
|
# UNPACK REMAINING CL ARGUMENTS USING `EXEC` TO SETUP THE VARIABLE NAMES |
|
# AUTOMATICALLY |
|
a = {} |
|
for arg, val in list(arguments.items()): |
|
if arg[0] == "-": |
|
varname = arg.replace("-", "") + "Flag" |
|
else: |
|
varname = arg.replace("<", "").replace(">", "") |
|
a[varname] = val |
|
if arg == "--dbConn": |
|
dbConn = val |
|
a["dbConn"] = val |
|
log.debug('%s = %s' % (varname, val,)) |
|
|
|
a['arm'] = a['arm'].upper() |
|
a["type"] = a['type'].upper() |
|
a["exptime"] = int(a['exptime']) |
|
a["tag"] = a["tagFlag"] |
|
if a["tag"]: |
|
a["tag"] = f"_{a['tag']}" |
|
else: |
|
a["tag"] = "" |
|
|
|
# FIND FITS DATA FRAMES MATCHING CRITERIA |
|
# GENERATE A LIST OF FILE PATHS |
|
simulatedDataRoot = a["inputFlag"] |
|
# MAKE RELATIVE HOME PATH ABSOLUTE |
|
home = expanduser("~") |
|
if simulatedDataRoot[0] == "~": |
|
simulatedDataRoot = simulatedDataRoot.replace("~", home) |
|
if not len(simulatedDataRoot): |
|
simulatedDataRoot = "." |
|
simulatedDataRoot = os.path.abspath(simulatedDataRoot) |
|
|
|
imageFrames = get_data_images(log=log, simulatedDataRoot=simulatedDataRoot, arm=a['arm'], ttype=a[ |
|
'type'], tag=a['tag'], exptime=a['exptime']) |
|
|
|
print("INPUT FRAMES") |
|
from tabulate import tabulate |
|
print(tabulate(imageFrames, headers='keys', tablefmt='psql')) |
|
|
|
# print(imageFrames) |
|
# sys.exit(0) |
|
|
|
primHdr, extHdr = get_header_template( |
|
log=log, |
|
simulatedDataRoot=simulatedDataRoot, |
|
arm=a['arm'], |
|
ttype=a['type'] |
|
) |
|
|
|
outputPaths, outputDir = write_simulated_frames(log=log, outputDir=a[ |
|
"outputFlag"], imageFrames=imageFrames, primHdr=primHdr, extHdr=extHdr, arm=a["arm"], ttype=a["type"], tag=a["tag"], exptime=a["exptime"]) |
|
|
|
# GENERATE A LIST OF OUTPUT FITS FILE PATHS |
|
fitsPaths = [] |
|
for d in os.listdir(outputDir): |
|
filepath = os.path.join(outputDir, d) |
|
if os.path.isfile(filepath) and os.path.splitext(filepath)[1] == ".fits": |
|
fitsPaths.append(filepath) |
|
|
|
# GENERATE THE IMAGECOLLECTION |
|
keys = ['INSTRUME', 'ESO DPR CATG', |
|
'ESO DPR TYPE', 'ESO DPR TECH', 'ESO TPL NAME', 'ESO SEQ ARM'] |
|
# keys = ['imagetyp', 'object', 'filter', 'exposure'] |
|
collection = ImageFileCollection(filenames=fitsPaths, keywords=keys) |
|
collection.sort(['file']) |
|
|
|
from tabulate import tabulate |
|
print("OUTPUT FRAMES") |
|
print(tabulate(collection.summary, headers='keys', tablefmt='psql')) |
|
|
|
return |
|
|
|
|
|
def get_data_images( |
|
log, |
|
simulatedDataRoot, |
|
arm, |
|
ttype, |
|
tag, |
|
exptime): |
|
"""*return a pandas data-frame of data images matching requested criteria* |
|
|
|
**Key Arguments:** |
|
|
|
- `log` -- logger |
|
- `simulatedDataRoot` -- path to the root directory of the simulated data |
|
- `arm` -- NIR/VIS |
|
- `type` -- the frame type |
|
- `tag` -- extra metatag for input frames (e.g. CR for cosmic rays) |
|
- `exptime` -- the exposure time |
|
|
|
**Return:** |
|
|
|
- `imageFrames` -- pandas data-frame of images |
|
""" |
|
log.debug('starting the ``functionName`` function') |
|
|
|
pathToDataDirectory = f"{simulatedDataRoot}/data/{arm}/{ttype}{tag}" |
|
|
|
fitsNames = [] |
|
fitsPaths = [] |
|
types = [] |
|
exptimes = [] |
|
for d in os.listdir(pathToDataDirectory): |
|
filepath = os.path.join(pathToDataDirectory, d) |
|
if os.path.isfile(filepath) and os.path.splitext(filepath)[1] == ".fits": |
|
fitsNames.append(d) |
|
fitsPaths.append(filepath) |
|
meta = d.split("_") |
|
types.append(meta[0]) |
|
exptimes.append(int(meta[1].replace("s", ""))) |
|
|
|
# CREATE DATA FRAME FROM A DICTIONARY OF LISTS |
|
dataFrames = pd.DataFrame({ |
|
"filepath": fitsPaths, |
|
"filename": fitsNames, |
|
"type": types, |
|
"exptime": exptimes |
|
}) |
|
# SORT BY COLUMN NAME |
|
dataFrames.sort_values( |
|
by=['type', 'exptime', 'filename'], inplace=True) |
|
|
|
# NOW FILTER BY GIVEN CRITERIA |
|
# FILTER DATA FRAME |
|
# FIRST CREATE THE MASK |
|
mask = (dataFrames['exptime'].isin([exptime])) |
|
filteredFrames = dataFrames.loc[mask] |
|
if not len(filteredFrames.index): |
|
print(f"There are {arm} no data-frames of type '{ttype}' and exptime '{exptime}'. Here are the frames you can choose from:") |
|
print(tabulate(dataFrames, headers='keys', tablefmt='psql')) |
|
sys.exit(0) |
|
|
|
log.debug('completed the ``functionName`` function') |
|
return filteredFrames |
|
|
|
|
|
def get_header_template( |
|
log, |
|
simulatedDataRoot, |
|
arm, |
|
ttype): |
|
"""*return the primary and extension header templates to use when stitching data into simulated frames* |
|
|
|
**Key Arguments:** |
|
|
|
- `log` -- logger |
|
- `simulatedDataRoot` -- path to the root directory of the simulated data |
|
- `arm` -- NIR/VIS |
|
- `type` -- the frame type |
|
|
|
*Return* |
|
|
|
- ``primHdr`` -- the template primary header |
|
- ``extHdr`` -- the template extension header |
|
""" |
|
log.debug('starting the ``get_header_template`` function') |
|
|
|
tpl = None |
|
if ttype == "FLAT": |
|
ttype = "FLAT,LAMP" |
|
tpl = "Flat Field calibration for NIR" |
|
elif ttype == "ORDER_CENTRE": |
|
ttype = "FLAT,LAMP" |
|
tpl = "Lamp Flat Single Pinhole for NIR" |
|
elif ttype == "SPH_ARC": |
|
ttype = "STD,WAVE" |
|
tpl = "Arcs Pinhole Spectral calibration for NIR" |
|
elif ttype == "MPH_ARC": |
|
ttype = "STD,WAVE" |
|
tpl = "Arcs Pinhole Spectral calibration for NIR" |
|
elif ttype == "STARE": |
|
ttype = "OBJECT" |
|
tpl = "Observation template with async exposures" |
|
|
|
# NOW GET THE FITS HEADERS |
|
pathToHeaderDirectory = f"{simulatedDataRoot}/headers" |
|
|
|
# GENERATE A LIST OF FITS FILE PATHS |
|
fitsPaths = [] |
|
for d in os.listdir(pathToHeaderDirectory): |
|
filepath = os.path.join(pathToHeaderDirectory, d) |
|
if os.path.isfile(filepath) and os.path.splitext(filepath)[1] == ".fits": |
|
fitsPaths.append(filepath) |
|
|
|
# GENERATE THE IMAGECOLLECTION |
|
keys = ['INSTRUME', 'ESO DPR CATG', |
|
'ESO DPR TYPE', 'ESO DPR TECH', 'ESO TPL NAME', 'ESO SEQ ARM'] |
|
# keys = ['imagetyp', 'object', 'filter', 'exposure'] |
|
collection = ImageFileCollection(filenames=fitsPaths, keywords=keys) |
|
|
|
collection.sort(['file']) |
|
|
|
matches = (collection.summary['INSTRUME'] == "SOXS") & ( |
|
collection.summary['ESO SEQ ARM'] == arm.upper()) & (collection.summary['ESO DPR TYPE'] == ttype.upper()) |
|
if tpl: |
|
matches = matches & (collection.summary['ESO TPL NAME'] == tpl) |
|
headers = collection.summary[matches] |
|
from tabulate import tabulate |
|
print("MATCHED HEADERS") |
|
print(tabulate(headers, headers='keys', tablefmt='psql')) |
|
|
|
if not len(headers): |
|
print(f"There are no {arm} header-frames of type '{ttype}'. Here are the frames you can choose from:") |
|
collection.summary.pprint_all() |
|
sys.exit(0) |
|
|
|
# OPEN TEMPLATE HEADERS |
|
templateHeaderFile = headers[0]["file"] |
|
with fits.open(templateHeaderFile, "readonly") as hdul: |
|
primHdr = hdul[0].header |
|
extHdr = hdul[1].header |
|
|
|
log.debug('completed the ``get_header_template`` function') |
|
return primHdr, extHdr |
|
|
|
|
|
def write_simulated_frames( |
|
log, |
|
outputDir, |
|
imageFrames, |
|
primHdr, |
|
extHdr, |
|
arm, |
|
ttype, |
|
tag, |
|
exptime): |
|
"""*summary of function* |
|
|
|
**Key Arguments:** |
|
|
|
- `log` -- logger |
|
- `outputDir` -- path to directory to output the requested frames to |
|
- `imageFrames` -- imagecollection of the image frames |
|
- `primHdr` -- template primary header |
|
- `extHdr` -- template extension header |
|
- `arm` -- NIR/VIS |
|
- `type` -- the frame type |
|
- `tag` -- extra metatag for input frames (e.g. CR for cosmic rays) |
|
- `exptime` -- the exposure time |
|
|
|
**Return** |
|
|
|
- ``outputPaths`` -- list of paths to the output frames |
|
|
|
""" |
|
log.debug('starting the ``functionName`` function') |
|
|
|
# FORMAT OUTPUT PATH |
|
home = expanduser("~") |
|
outputDir = outputDir.replace("~", home) + f'/{ttype}{tag}' |
|
# RECURSIVELY CREATE MISSING DIRECTORIES |
|
if not os.path.exists(outputDir): |
|
os.makedirs(outputDir) |
|
|
|
fh_type = ttype |
|
|
|
if fh_type == "FLAT": |
|
fh_type = "LAMP,FLAT" |
|
elif fh_type == "ORDER_CENTRE": |
|
fh_type = "LAMP,ORDERDEF" |
|
elif fh_type == 'SPH_ARC': |
|
fh_type = "LAMP,FMTCHK" |
|
elif fh_type == 'MPH_ARC': |
|
fh_type = "LAMP,WAVE" |
|
elif fh_type == 'STARE': |
|
fh_type = "OBJECT" |
|
|
|
# ASTROCALC CONVERTER |
|
converter = conversions( |
|
log=log |
|
) |
|
outputPaths = [] |
|
numberFrames = len(imageFrames) |
|
for i, frame in enumerate(imageFrames['filepath']): |
|
now = datetime.now() + timedelta(seconds=float(exptime) * i) |
|
nowStr = now.strftime("%Y-%m-%dT%H:%M:%S.%f") |
|
|
|
thisHrd = primHdr.copy() |
|
|
|
# CREATE DICTIONARY OF HEADER KEYWORDS TO UPDATE |
|
kw = { |
|
"DATE": nowStr, |
|
"EXPTIME": float(exptime), |
|
"DATE-OBS": nowStr, |
|
"MJD-OBS": (converter.ut_datetime_to_mjd(utDatetime=nowStr), nowStr), |
|
"ESO DET SEQ1 DIT": float(exptime), |
|
"ESO DET SEQ1 EXPTIME": float(exptime), |
|
"ESO TPL NEXP": numberFrames, |
|
"ESO TPL EXPNO": i + 1, |
|
"ESO DPR TYPE": fh_type |
|
} |
|
|
|
onofftag = "" |
|
if arm == "NIR": |
|
basename = os.path.basename(frame) |
|
if "_ON" in basename: |
|
if ttype == "FLAT": |
|
kw["ESO DPR TECH"] = "ECHELLE,SLIT" |
|
elif ttype in ["ORDER_CENTRE", "SPH_ARC"]: |
|
kw["ESO DPR TECH"] = "ECHELLE,PINHOLE" |
|
elif ttype in ["MPH_ARC"]: |
|
kw["ESO DPR TECH"] = "ECHELLE,MULTI-PINHOLE" |
|
elif ttype in ["STARE"]: |
|
kw["ESO DPR TECH"] = "ECHELLE,SLIT,STARE" |
|
onofftag = "_ON" |
|
elif "_OFF" in basename: |
|
kw["ESO DPR TECH"] = "IMAGE" |
|
kw["ESO DPR CATG"] = "CALIB" |
|
onofftag = "_OFF" |
|
|
|
# UPDATE PRIMARY HEADER |
|
for k, v in kw.items(): |
|
thisHrd[k] = v |
|
|
|
# CREATE DICTIONARY OF HEADER KEYWORDS TO UPDATE |
|
kw = { |
|
"EXPTIME": float(exptime), |
|
"ESO DET EXP UTC": nowStr, |
|
"ESO DET FRAM UTC": nowStr, |
|
"ESO DET SEQ UTC": nowStr |
|
} |
|
# UPDATE EXTENSION HEADER |
|
for k, v in kw.items(): |
|
extHdr[k] = v |
|
|
|
with fits.open(frame, "readonly") as hdul: |
|
data = hdul[0].data |
|
|
|
# CREATE THE FITS FRAME |
|
primHDU = fits.PrimaryHDU(header=thisHrd) |
|
# primHDU.header = primHdr |
|
imageHDU = fits.ImageHDU(header=extHdr, data=data) |
|
|
|
hdul = fits.HDUList([primHDU, imageHDU]) |
|
outPath = f"{outputDir}/SOXS_{arm}_{ttype}{onofftag}_{exptime}_{i+1:03d}.fits" |
|
hdul.writeto(outPath, output_verify="fix+warn", |
|
overwrite=True, checksum=False) |
|
outputPaths.append(outPath) |
|
|
|
log.debug('completed the ``functionName`` function') |
|
return outputPaths, outputDir |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |