Last active
May 5, 2021 16:00
-
-
Save thespacedoctor/7795ea4bbaf5ac140f4c3c314a97aa01 to your computer and use it in GitHub Desktop.
[Reflex Python Plugin Support] #python #reflex #pipeline #soxs #muse
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This file shows the minimum example of a Python based EsoRex recipe that can | |
# be integrated into a EsoReflex workflow, i.e. executed with the RecipeExecuter | |
# actor. | |
# The appropriate module needs to first be loaded that provides the facilities | |
# for declaring an EsoRex recipe plugin. | |
import esorexplugin | |
# Additionally it is convenient to import some of the utility functions from the | |
# module directly into this code's namespace. Note, the import statement above | |
# is still necessary for technical reasons to have access to the class | |
# esorexplugin.RecipePlugin. NOTE: one should not import the RecipePlugin class | |
# directly into this namespace, since that will make EsoRex see it and think | |
# that it is also a valid plugin. | |
from esorexplugin import * | |
# One will typically also want to import the astropy FITS I/O module to be able | |
# to load and save the recipe input frames. | |
from astropy.io import fits | |
# Must define a new class deriving from esorexplugin.RecipePlugin to write a | |
# new Python based recipe plugin for EsoRex. | |
class TestRecipe(esorexplugin.RecipePlugin): | |
# The recipe must be given a name by setting the 'name' class attribute. | |
name = "test" | |
# A version needs to be assigned. For this it is best to use the | |
# VersionNumber method that will correctly encode a major, minor and micro | |
# version sub-numbers. NOTE: minor and micro must be in the range [0..99]. | |
version = VersionNumber(1, 2, 3) | |
# A synopsis string should be given for the recipe for the man page created | |
# by EsoRex. | |
# NOTE: If the following class attribute is not provided, the first line of | |
# this class's doc string will be used for the synopsis instead. | |
synopsis = "Python based example recipe" | |
# A detailed description string should be given for the recipe for the man | |
# page created by EsoRex. | |
# NOTE: If the following class attribute is not provided, the whole text in | |
# the class's doc string will be used for the description instead. | |
description = "Long description for Python example recipe goes here" | |
# The following additional fields should be provided for the man page | |
# generated by EsoRex. | |
author = "Some Author" | |
email = "someone@eso.org" | |
# Any copyright string can be given, or alternatively the ESO default | |
# GNU v2 copyright for pipelines can be used with the EsoCopyright function. | |
# If using EsoCopyright, the instrument or project name should be given as | |
# the first argument, followed by a copyright year or list of years. | |
copyright = EsoCopyright("TEST_INSTRUMENT", [2010, 2013, 2014, 2015, 2017]) | |
# A list of recipe input parameters is specified in the parameters class | |
# attribute. The individual entries should be created with one of the | |
# functions: | |
# ValueParameter - if the parameter should be a single value of type | |
# boolean, integer, float or string. | |
# RangeParameter - if the parameter must be an integer or float value | |
# within a given range. | |
# EnumParameter - if the parameter should be an enumeration, i.e. a value | |
# selectable from a specific set of integers, floats or strings. | |
# There are a number of additional features that can be controlled for each | |
# parameter, such as its alias in different contexts. Please refer to the | |
# documentation of the above mentioned functions for more details. | |
parameters = [ | |
ValueParameter("test.par1", True), | |
RangeParameter("test.par2", 3, 2, 5), | |
EnumParameter("test.par3", "x", ["x", "y", "z"]) | |
] | |
# The recipeconfig is an optional attribute that must be a list of entries | |
# created with the FrameData function. It allows to specify some useful | |
# information about the input frames supported by this recipe and the | |
# inter-dependence between frames. If this attribute is given then the | |
# recipe will be a version 2 CPL recipe plugin. Otherwise this recipe | |
# defaults to being a version 1 recipe. | |
# NOTE: EsoRex does not currently use this additional information in any | |
# special way. | |
recipeconfig = [ | |
FrameData('RAW', min = 2, max = 5, | |
inputs = [ | |
FrameData('MASTER'), | |
FrameData('CALIB', min = 0, max = 2) | |
], | |
outputs = ['PROD']), | |
FrameData('MASTER', inputs = [FrameData('RAW')]), | |
FrameData('CALIB') | |
] | |
# The set_frame_group method must be overridden by a new recipe plugin. The | |
# purpose of this method is to catalogue the input frames to the recipe. | |
# Primarily to indicate to EsoRex if an input frame is a raw frame or | |
# calibration frame. | |
def set_frame_group(self, frame): | |
if frame.tag == 'RAW': | |
frame.group = Frame.GROUP_RAW | |
else: | |
frame.group = Frame.GROUP_CALIB | |
frame.level = Frame.LEVEL_FINAL | |
# A method called process must be defined that will take as first argument | |
# 'frames', which is a list of Frame objects representing the input frames | |
# to the recipe from the SOF. As the process method executes it should | |
# append any new output Frame objects to an output list and return the | |
# output frame list at the end of the method. | |
# The additional attributes should be the parameters as declared in the | |
# 'parameters' class attribute above. There should be as many method | |
# arguments as there were parameters declared. Their names can be anything, | |
# but they will be filled in the same order as declared in the 'parameters' | |
# list above. | |
def process(self, frames, par1, par2, par3): | |
if par1: | |
print("par1 is True") | |
else: | |
print("par1 is False") | |
print("par2 = {0}".format(par2)) | |
print("par3 = {0}".format(par3)) | |
# This will rarely be needed, but just in case, have more detailed | |
# parameter information in self.input_parameters. | |
# This is a list of RecipeParameter objects. | |
print(self.input_parameters) | |
# Errors should be raised with the raise_error method. | |
# Its signature is: | |
# def raise_error(self, message, exitcode = 1, print_traceback = False) | |
if len(frames) < 1: | |
self.raise_error("Need at least one input frame.") | |
# You can open the FITS file associated to a frame as follows. | |
# NOTE: the open function is just a wrapper around the astropy API. | |
# What you get back is an astropy HDUList object. | |
hdulist = frames[0].open() | |
# New output frames are created with the Frame object's constructor. | |
# The signature for the constructor method is: | |
# def __init__(self, filename, tag, type = None, group = None, level = None) | |
new_frame = Frame("output.fits", "PROD", type = Frame.TYPE_IMAGE) | |
output_frames = [] | |
# Make sure to add the new frames to an output list and return them at | |
# the end of this method to register them with EsoRex. | |
output_frames.append(new_frame) | |
# One can write the associated FITS file with the frame's write method. | |
# It accepts HDUList objects and additional arguments passed to the | |
# Astropy writeto method. | |
new_frame.write(hdulist, overwrite = par1, output_verify = 'ignore') | |
return output_frames |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ZAP - Zurich Atmosphere Purge | |
# | |
# Copyright (c) 2014-2016 Kurt Soto | |
# Copyright (c) 2015-2017 Simon Conseil | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining | |
# a copy of this software and associated documentation files (the | |
# "Software"), to deal in the Software without restriction, including | |
# without limitation the rights to use, copy, modify, merge, publish, | |
# distribute, sublicense, and/or sell copies of the Software, and to permit | |
# persons to whom the Software is furnished to do so, subject to the | |
# following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL | |
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | |
# DEALINGS IN THE SOFTWARE. | |
from __future__ import absolute_import, division, print_function | |
try: | |
import esorexplugin | |
from esorexplugin import * | |
except ImportError: | |
import_ok = 0 | |
try: | |
import astropy.units as u | |
import logging | |
import numpy as np | |
import os | |
import scipy.ndimage as ndi | |
import sys | |
from astropy.io import fits | |
from astropy.utils import minversion | |
from astropy.wcs import WCS | |
from functools import wraps | |
from multiprocessing import cpu_count, Manager, Process | |
from scipy.stats import sigmaclip | |
from sklearn.decomposition import PCA | |
from time import time | |
except ImportError: | |
import_ok = 0 | |
#from .version import __version__ | |
import sys | |
import os | |
import json | |
import warnings | |
import argparse | |
__version__ = '2.1.dev workflow' | |
__all__ = ['processzap', 'SVDoutput', 'nancleanfits', 'contsubfits', 'Zap', | |
'wmedian', 'SKYSEG'] | |
# Limits of the segments in Angstroms. Zap now uses by default only one | |
# segment, see below for the old values. | |
SKYSEG = [0,10000] | |
# These are the old limits from the original zap. Keeping them for reference, | |
# and may be useful for further testing. | |
# [0, 5400, 5850, 6440, 6750, 7200, 7700, 8265, 8602, 8731, 9275, 10000] | |
# range where the NaD notch filters absorbs significant flux | |
NOTCH_FILTER_RANGES = { | |
'WFM-AO-E': [5755, 6008], | |
'WFM-AO-N': [5805, 5966], | |
'NFM-AO-N': [5780, 6050] | |
} | |
# List of allowed values for cftype (continuum filter) | |
CFTYPE_OPTIONS = ('weight', 'median', 'fit', 'none') | |
# Number of available CPUs | |
try: | |
NCPU = cpu_count() | |
PY2 = sys.version_info[0] == 2 | |
ASTROPY_LT_1_3 = not minversion('astropy', '1.3') | |
if not PY2: | |
text_type = str | |
string_types = (str,) | |
else: | |
text_type = unicode | |
string_types = (str, unicode) | |
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.INFO, | |
stream=sys.stdout) | |
logger = logging.getLogger(__name__) | |
except: | |
failure=1 | |
# ================= Top Level Functions ================= | |
try: | |
def processzap(musecubefits, outcubefits='DATACUBE_ZAP.fits', clean=True, | |
zlevel='median', cftype='weight', cfwidthSVD=300, cfwidthSP=300, | |
nevals=[], extSVD=None, skycubefits=None, mask=None, | |
interactive=False, ncpu=None, pca_class=None, n_components=None, | |
overwrite=False, varcurvefits=None): | |
""" Performs the entire ZAP sky subtraction algorithm. | |
This is the main ZAP function. It works on an input FITS file and | |
optionally writes the product to an output FITS file. | |
Parameters | |
---------- | |
musecubefits : str | |
Input FITS file, containing a cube with data in the first extension. | |
outcubefits : str | |
Output FITS file, based on the input one to propagate all header | |
information and other extensions. Default to `DATACUBE_ZAP.fits`. | |
clean : bool | |
If True (default value), the NaN values are cleaned. Spaxels with more | |
then 25% of NaN values are removed, the others are replaced with an | |
interpolation from the neighbors. The NaN values are reinserted into | |
the final datacube. If set to False, any spaxel with a NaN value will | |
be ignored. | |
zlevel : str | |
Method for the zeroth order sky removal: `none`, `sigclip` or `median` | |
(default). | |
cftype : {'weight', 'median', 'fit', 'none'} | |
Method for the continuum filter. For the `weight` method, a zeroth | |
order sky is required (see `zlevel`). | |
cfwidthSVD : int or float | |
Window size for the continuum filter, for the SVD computation. | |
Default to 300. | |
cfwidthSP : int or float | |
Window size for the continuum filter used to remove the continuum | |
features for calculating the eigenvalues per spectrum. Smaller values | |
better trace the sources. An optimal range of is typically | |
20 - 50 pixels. Default to 300. | |
nevals : list | |
Allow to specify the number of eigenspectra used for each segment. | |
Provide either a single value that will be used for all of the | |
segments, or a list of 11 values that will be used for each of the | |
segments. | |
extSVD : Zap object | |
Can be a ``Zap`` object output from :func:`~zap.SVDoutput`. | |
If given, the SVD from this object will be used, otherwise the SVD is | |
computed. So this allows to compute the SVD on an other field or with | |
different settings. | |
skycubefits : str | |
Path for the optional output of the sky that is subtracted from the | |
cube. This is simply the input cube minus the output cube. | |
mask : str | |
A 2D fits image to exclude regions that may contaminate the zlevel or | |
eigenspectra. This image should be constructed from the datacube itself | |
to match the dimensionality. Sky regions should be marked as 0, and | |
astronomical sources should be identified with an integer greater than | |
or equal to 1. Default to None. | |
interactive : bool | |
If True, a :class:`~zap.Zap` object containing all information on | |
the ZAP process is returned, and can be used to explore the | |
eigenspectra and recompute the output (with the | |
:meth:`~zap.Zap.reprocess` method). In this case, the output files | |
are not saved (`outcubefits` and `skycubefits` are ignored). Default | |
to False. | |
varcurvefits : str | |
Path for the optional output of the explained variance curves. | |
""" | |
logger.info('Running ZAP') | |
logger.info('Running ZAP %s !', __version__) | |
t0 = time() | |
if not isinstance(musecubefits, string_types): | |
raise TypeError('The process method only accepts a single datacube ' | |
'filename.') | |
# check if outcubefits/skycubefits exists before beginning | |
if not overwrite: | |
def _check_file_exists(filename): | |
if filename is not None and os.path.exists(filename): | |
raise IOError('Output file "{0}" exists'.format(filename)) | |
_check_file_exists(outcubefits) | |
_check_file_exists(skycubefits) | |
if ncpu is not None: | |
global NCPU | |
NCPU = ncpu | |
if extSVD is not None and mask is not None: | |
raise ValueError('extSVD and mask parameters are incompatible: if mask' | |
' must be used, then the SVD has to be recomputed') | |
if mask is not None or (extSVD is None and cfwidthSVD != cfwidthSP): | |
# Compute the SVD separately, only if a mask is given, or if the | |
# cfwidth values differ and extSVD is not given. Otherwise, the SVD | |
# will be computed in the _run method, which allows to avoid running | |
# twice the zlevel and continuumfilter steps. | |
extSVD = SVDoutput(musecubefits, clean=clean, zlevel=zlevel, | |
cftype=cftype, cfwidth=cfwidthSVD, mask=mask) | |
zobj = Zap(musecubefits, pca_class=pca_class, n_components=n_components) | |
zobj._run(clean=clean, zlevel=zlevel, cfwidth=cfwidthSP, cftype=cftype, | |
nevals=nevals, extSVD=extSVD) | |
if interactive: | |
# Return the zobj object without saving files | |
return zobj | |
if skycubefits is not None: | |
zobj.writeskycube(skycubefits=skycubefits, overwrite=overwrite) | |
if varcurvefits is not None: | |
zobj.writevarcurve(varcurvefits=varcurvefits, overwrite=overwrite) | |
zobj.mergefits(outcubefits, overwrite=overwrite) | |
logger.info('Zapped! (took %.2f sec.)', time() - t0) | |
def SVDoutput(musecubefits, clean=True, zlevel='median', cftype='weight', | |
cfwidth=300, mask=None, ncpu=None, pca_class=None, | |
n_components=None): | |
"""Performs the SVD decomposition of a datacube. | |
This allows to use the SVD for a different datacube. It used to allow to | |
save the SVD to a file but this is no more possible. Instead it returns | |
a ``Zap`` which can be given to the :func:`~zap.process` function. | |
Parameters | |
---------- | |
musecubefits : str | |
Input FITS file, containing a cube with data in the first extension. | |
clean : bool | |
If True (default value), the NaN values are cleaned. Spaxels with more | |
then 25% of NaN values are removed, the others are replaced with an | |
interpolation from the neighbors. | |
zlevel : str | |
Method for the zeroth order sky removal: `none`, `sigclip` or `median` | |
(default). | |
cftype : {'weight', 'median', 'fit', 'none'} | |
Method for the continuum filter. For the `weight` method, a zeroth | |
order sky is required (see `zlevel`). | |
cfwidth : int or float | |
Window size for the continuum filter, default to 300. | |
mask : str | |
Path of a FITS file containing a mask (1 for objects, 0 for sky). | |
""" | |
logger.info('Processing %s to compute the SVD', musecubefits) | |
if ncpu is not None: | |
global NCPU | |
NCPU = ncpu | |
zobj = Zap(musecubefits, pca_class=pca_class, n_components=n_components) | |
zobj._prepare(clean=clean, zlevel=zlevel, cftype=cftype, | |
cfwidth=cfwidth, mask=mask) | |
zobj._msvd() | |
return zobj | |
def contsubfits(cubefits, outfits='CONTSUB_CUBE.fits', ncpu=None, | |
cftype='median', cfwidth=300, clean_nan=True, zlevel='median', | |
overwrite=False): | |
"""A standalone implementation of the continuum removal.""" | |
if ncpu is not None: | |
global NCPU | |
NCPU = ncpu | |
zobj = Zap(cubefits) | |
zobj._prepare(clean=clean_nan, zlevel=zlevel, cftype=cftype, | |
cfwidth=cfwidth) | |
cube = zobj.make_contcube() | |
outhead = _newheader(zobj) | |
outhdu = fits.PrimaryHDU(data=cube, header=outhead) | |
write_hdulist_to(outhdu, outfits, overwrite=overwrite) | |
logger.info('Continuum cube file saved to %s', outfits) | |
def nancleanfits(musecubefits, outfn='NANCLEAN_CUBE.fits', rejectratio=0.25, | |
boxsz=1, overwrite=False): | |
"""Interpolates NaN values from the nearest neighbors. | |
Parameters | |
---------- | |
musecubefits : str | |
Input FITS file, containing a cube with data in the first extension. | |
outfn : str | |
Output FITS file. Default to ``NANCLEAN_CUBE.fits``. | |
rejectratio : float | |
Defines a cutoff for the ratio of NAN to total pixels in a spaxel | |
before the spaxel is avoided completely. Default to 0.25 | |
boxsz : int | |
Defines the number of pixels around the offending NaN pixel. | |
Default to 1, which looks for the 26 nearest neighbors which | |
is a 3x3x3 cube. | |
""" | |
with fits.open(musecubefits) as hdu: | |
hdu[1].data = _nanclean(hdu[1].data, rejectratio=rejectratio, | |
boxsz=boxsz)[0] | |
write_hdulist_to(hdu, outfn, overwrite=overwrite) | |
def timeit(func): | |
@wraps(func) | |
def wrapped(*args, **kwargs): | |
t0 = time() | |
res = func(*args, **kwargs) | |
logger.info('%s - Time: %.2f sec.', func.__name__, time() - t0) | |
return res | |
return wrapped | |
# ================= Main class ================= | |
class Zap(object): | |
""" Main class to run each of the steps of ZAP. | |
Attributes | |
---------- | |
cleancube : numpy.ndarray | |
The final datacube after removing all of the residual features. | |
contarray : numpy.ndarray | |
A 2D array containing the subtracted continuum per spaxel. | |
cube : numpy.ndarray | |
The original cube with the zlevel subtraction performed per spaxel. | |
laxis : numpy.ndarray | |
A 1d array containing the wavelength solution generated from the header | |
parameters. | |
wcs : astropy.wcs.WCS | |
WCS object with the wavelength solution. | |
lranges : list | |
A list of the wavelength bin limits used in segmenting the sepctrum | |
for SVD. | |
nancube : numpy.ndarray | |
A 3d boolean datacube containing True in voxels where a NaN value was | |
replaced with an interpolation. | |
nevals : numpy.ndarray | |
A 1d array containing the number of eigenvalues used per segment to | |
reconstruct the residuals. | |
normstack : numpy.ndarray | |
A normalized version of the datacube decunstructed into a 2d array. | |
pranges : numpy.ndarray | |
The pixel indices of the bounding regions for each spectral segment. | |
recon : numpy.ndarray | |
A 2d array containing the reconstructed emission line residuals. | |
run_clean : bool | |
Boolean that indicates that the NaN cleaning method was used. | |
run_zlevel : bool | |
Boolean indicating that the zero level correction was used. | |
stack : numpy.ndarray | |
The datacube deconstructed into a 2d array for use in the the SVD. | |
variancearray : numpy.ndarray | |
A list of length nsegments containing variances calculated per spaxel | |
used for normalization | |
y,x : numpy.ndarray | |
The position in the cube of the spaxels that are in the 2d | |
deconstructed stack | |
zlsky : numpy.ndarray | |
A 1d array containing the result of the zero level subtraction | |
""" | |
def __init__(self, musecubefits, pca_class=None, n_components=None): | |
self.musecubefits = musecubefits | |
with fits.open(musecubefits) as hdul: | |
self.ins_mode = hdul[0].header.get('HIERARCH ESO INS MODE') | |
self.cube = hdul[1].data | |
self.header = hdul[1].header | |
# Workaround for floating points errors in wcs computation: if cunit is | |
# specified, wcslib will convert in meters instead of angstroms, so we | |
# remove cunit before creating the wcs object | |
header = self.header.copy() | |
unit = u.Unit(header.pop('CUNIT3')) | |
self.wcs = WCS(header).sub([3]) | |
# Create Lambda axis | |
wlaxis = np.arange(self.cube.shape[0]) | |
self.laxis = self.wcs.all_pix2world(wlaxis, 0)[0] | |
if unit != u.angstrom: | |
# Make sure lambda is in angstroms | |
self.laxis = (self.laxis * unit).to(u.angstrom).value | |
# Change laser region into zeros if AO | |
if self.ins_mode in NOTCH_FILTER_RANGES: | |
logger.info('Cleaning laser region for AO, mode=%s, limits=%s', | |
self.ins_mode, NOTCH_FILTER_RANGES[self.ins_mode]) | |
self.notch_limits = self.wcs.all_world2pix( | |
NOTCH_FILTER_RANGES[self.ins_mode], 0)[0].astype(int) | |
lmin, lmax = self.notch_limits | |
self.cube[lmin:lmax + 1] = 0.0 | |
# NaN Cleaning | |
self.run_clean = False | |
self.nancube = None | |
self._boxsz = 1 | |
self._rejectratio = 0.25 | |
# Mask file | |
self.maskfile = None | |
# zlevel parameters | |
self.run_zlevel = False | |
self.zlsky = np.zeros_like(self.laxis) | |
# Extraction results | |
self.stack = None | |
self.y = None | |
self.x = None | |
# Normalization Maps | |
self.contarray = None | |
self.variancearray = None | |
self.normstack = None | |
# identify the spectral range of the dataset | |
laxmin = min(self.laxis) | |
laxmax = max(self.laxis) | |
# List of segmentation limits in the optical | |
skyseg = np.array(SKYSEG) | |
skyseg = skyseg[(skyseg > laxmin) & (skyseg < laxmax)] | |
# segment limit in angstroms | |
self.lranges = (np.vstack([np.append(laxmin - 10, skyseg), | |
np.append(skyseg, laxmax + 10)])).T | |
logger.info(self.lranges) | |
# segment limit in pixels | |
laxis = self.laxis | |
lranges = self.lranges | |
pranges = [] | |
for i in range(len(lranges)): | |
paxis = wlaxis[(laxis > lranges[i, 0]) & (laxis <= lranges[i, 1])] | |
pranges.append((np.min(paxis), np.max(paxis) + 1)) | |
self.pranges = np.array(pranges) | |
logger.info(self.pranges) | |
# eigenspace Subset | |
if pca_class is not None: | |
logger.info('Using %s', pca_class) | |
self.pca_class = pca_class | |
else: | |
self.pca_class = PCA | |
# Reconstruction of sky features | |
self.n_components = n_components | |
self.recon = None | |
self.cleancube = None | |
@timeit | |
def _prepare(self, clean=True, zlevel='median', cftype='weight', | |
cfwidth=300, extzlevel=None, mask=None): | |
# Check for consistency between weighted median and zlevel keywords | |
if cftype == 'weight' and zlevel == 'none': | |
raise ValueError('Weighted median requires a zlevel calculation') | |
# clean up the nan values | |
if clean: | |
self._nanclean() | |
# if mask is supplied, apply it | |
if mask is not None: | |
self._applymask(mask) | |
# Extract the spectra that we will be working with | |
self._extract() | |
# remove the median along the spectral axis | |
if extzlevel is None: | |
if zlevel.lower() != 'none': | |
self._zlevel(calctype=zlevel) | |
else: | |
self._externalzlevel(extzlevel) | |
# remove the continuum level - this is multiprocessed to speed it up | |
self._continuumfilter(cfwidth=cfwidth, cftype=cftype) | |
# normalize the variance in the segments. | |
self._normalize_variance() | |
def _run(self, clean=True, zlevel='median', cftype='weight', | |
cfwidth=300, nevals=[], extSVD=None): | |
""" Perform all steps to ZAP a datacube: | |
- NaN re/masking, | |
- deconstruction into "stacks", | |
- zerolevel subraction, | |
- continuum removal, | |
- normalization, | |
- singular value decomposition, | |
- eigenvector selection, | |
- residual reconstruction and subtraction, | |
- data cube reconstruction. | |
""" | |
self._prepare(clean=clean, zlevel=zlevel, cftype=cftype, | |
cfwidth=cfwidth, extzlevel=extSVD) | |
# do the multiprocessed SVD calculation | |
if extSVD is None: | |
self._msvd() | |
else: | |
self.models = extSVD.models | |
self.components = [m.components_.copy() for m in self.models] | |
# choose some fraction of eigenspectra or some finite number of | |
# eigenspectra | |
if nevals == []: | |
self.optimize() | |
self.chooseevals(nevals=self.nevals) | |
else: | |
self.chooseevals(nevals=nevals) | |
# reconstruct the sky residuals using the subset of eigenspace | |
self.reconstruct() | |
# stuff the new spectra back into the cube | |
self.remold() | |
def _nanclean(self): | |
""" | |
Detects NaN values in cube and removes them by replacing them with an | |
interpolation of the nearest neighbors in the data cube. The positions | |
in the cube are retained in nancube for later remasking. | |
""" | |
self.cube, self.nancube = _nanclean( | |
self.cube, rejectratio=self._rejectratio, boxsz=self._boxsz) | |
self.run_clean = True | |
@timeit | |
def _extract(self): | |
"""Deconstruct the datacube into a 2d array. | |
Since spatial information is not required, and the linear algebra | |
routines require 2d arrays. The operation rejects any spaxel with even | |
a single NaN value, since this would cause the linear algebra routines | |
to crash. | |
Adds the x and y data of these positions into the Zap class | |
""" | |
# make a map of spaxels with NaNs | |
badmap = (np.logical_not(np.isfinite(self.cube))).sum(axis=0) | |
# get positions of those with no NaNs | |
self.y, self.x = np.where(badmap == 0) | |
# extract those positions into a 2d array | |
self.stack = self.cube[:, self.y, self.x] | |
logger.info('Extract to 2D, %d valid spaxels (%d%%)', len(self.x), | |
len(self.x) / np.prod(self.cube.shape[1:]) * 100) | |
def _externalzlevel(self, extSVD): | |
"""Remove the zero level from the extSVD file.""" | |
logger.debug('Using external zlevel from %s', extSVD) | |
if isinstance(extSVD, Zap): | |
self.zlsky = np.array(extSVD.zlsky, copy=True) | |
self.run_zlevel = extSVD.run_zlevel | |
else: | |
self.zlsky = fits.getdata(extSVD, 0) | |
self.run_zlevel = 'extSVD' | |
self.stack -= self.zlsky[:, np.newaxis] | |
@timeit | |
def _zlevel(self, calctype='median'): | |
""" | |
Removes a 'zero' level from each spectral plane. Spatial information is | |
not required, so it operates on the extracted stack. | |
Operates on stack, leaving it with this level removed and adds the data | |
'zlsky' to the class. zlsky is a spectrum of the zero levels. | |
This zero level is currently calculated with a median. | |
Experimental operations - | |
- exclude top quartile | |
- run in an iterative sigma clipped mode | |
""" | |
self.run_zlevel = calctype | |
if calctype == 'none': | |
logger.info('Skipping zlevel subtraction') | |
return | |
if calctype == 'median': | |
logger.info('Median zlevel subtraction') | |
func = _imedian | |
elif calctype == 'sigclip': | |
logger.info('Iterative Sigma Clipping zlevel subtraction') | |
func = _isigclip | |
else: | |
raise ValueError('Unknow zlevel type, must be none, median, or ' | |
'sigclip') | |
self.zlsky = np.hstack(parallel_map(func, self.stack, NCPU, axis=0)) | |
self.stack -= self.zlsky[:, np.newaxis] | |
@timeit | |
def _continuumfilter(self, cfwidth=300, cftype='weight'): | |
"""A multiprocessed implementation of the continuum removal. | |
This process distributes the data to many processes that then | |
reassemble the data. Uses two filters, a small scale (less than the | |
line spread function) uniform filter, and a large scale median filter | |
to capture the structure of a variety of continuum shapes. | |
added to class | |
contarray - the removed continuua | |
normstack - "normalized" version of the stack with the continuua | |
removed | |
""" | |
if cftype not in CFTYPE_OPTIONS: | |
raise ValueError("cftype must be weight, median, fit or none, " | |
"got {}".format(cftype)) | |
logger.info('Applying Continuum Filter, cftype=%s', cftype) | |
self._cftype = cftype | |
self._cfwidth = cfwidth | |
weight = None | |
if cftype == 'weight': | |
weight = np.abs(self.zlsky - (np.max(self.zlsky) + 1)) | |
# remove continuum features | |
if cftype == 'none': | |
self.normstack = self.stack.copy() | |
else: | |
self.contarray = _continuumfilter(self.stack, cftype, | |
weight=weight, cfwidth=cfwidth) | |
self.normstack = self.stack - self.contarray | |
def _normalize_variance(self): | |
"""Normalize the variance in the segments.""" | |
logger.debug('Normalizing variances') | |
# self.variancearray = np.std(self.stack, axis=1) | |
# self.normstack /= self.variancearray[:, np.newaxis] | |
nseg = len(self.pranges) | |
self.variancearray = var = np.zeros((nseg, self.stack.shape[1])) | |
for i in range(nseg): | |
pmin, pmax = self.pranges[i] | |
var[i, :] = np.var(self.normstack[pmin:pmax, :], axis=0) | |
self.normstack[pmin:pmax, :] /= var[i, :] | |
@timeit | |
def _msvd(self): | |
"""Multiprocessed singular value decomposition. | |
Takes the normalized, spectral segments and distributes them | |
to the individual svd methods. | |
""" | |
logger.info('Calculating SVD on %d segments (%s)', len(self.pranges), | |
self.pranges) | |
indices = [x[0] for x in self.pranges[1:]] | |
# normstack = self.stack - self.contarray | |
Xarr = np.array_split(self.normstack.T, indices, axis=1) | |
self.models = [] | |
for i, x in enumerate(Xarr): | |
if self.n_components is not None: | |
ncomp = max(x.shape[1] * self.n_components, 60) | |
logger.info('Segment %d, computing %d eigenvectors out of %d', | |
i, ncomp, x.shape[1]) | |
else: | |
ncomp = None | |
self.models.append(self.pca_class(n_components=ncomp).fit(x)) | |
def chooseevals(self, nevals=[]): | |
"""Choose the number of eigenspectra/evals to use for reconstruction. | |
User supplies the number of eigen spectra to be used (neval) or the | |
percentage of the eigenspectra that were calculated (peval) from each | |
spectral segment to be used. | |
The user can either provide a single value to be used for all segments, | |
or provide an array that defines neval or peval per segment. | |
""" | |
nranges = len(self.pranges) | |
nevals = np.atleast_1d(nevals) | |
# deal with an input list | |
if len(nevals) > 1: | |
if len(nevals) != nranges: | |
nevals = np.array([nevals[0]]) | |
logger.info('Chosen eigenspectra array does not correspond to ' | |
'number of segments') | |
else: | |
logger.info('Choosing %s eigenspectra for segments', nevals) | |
if len(nevals) == 1: | |
logger.info('Choosing %s eigenspectra for all segments', nevals) | |
nevals = np.zeros(nranges, dtype=int) + nevals | |
if nevals.ndim == 1: | |
start = np.zeros(nranges, dtype=int) | |
end = nevals | |
else: | |
start, end = nevals.T | |
self.nevals = nevals | |
for i, model in enumerate(self.models): | |
model.components_ = self.components[i][start[i]:end[i]] | |
@timeit | |
def reconstruct(self): | |
"""Reconstruct the residuals from a given set of eigenspectra and | |
eigenvalues | |
""" | |
logger.info('Reconstructing Sky Residuals') | |
indices = [x[0] for x in self.pranges[1:]] | |
# normstack = self.stack - self.contarray | |
Xarr = np.array_split(self.normstack.T, indices, axis=1) | |
Xnew = [model.transform(x) | |
for model, x in zip(self.models, Xarr)] | |
Xinv = [model.inverse_transform(x) | |
for model, x in zip(self.models, Xnew)] | |
self.recon = np.concatenate([x.T * self.variancearray[i, :] | |
for i, x in enumerate(Xinv)]) | |
# self.recon = np.concatenate([x.T for x in Xinv]) | |
# self.recon *= self.variancearray[:, np.newaxis] | |
def make_cube_from_stack(self, stack, with_nans=False): | |
"""Stuff the stack back into a cube.""" | |
cube = self.cube.copy() | |
cube[:, self.y, self.x] = stack | |
if with_nans: | |
cube[self.nancube] = np.nan | |
if self.ins_mode in NOTCH_FILTER_RANGES: | |
lmin, lmax = self.notch_limits | |
cube[lmin:lmax + 1] = np.nan | |
return cube | |
def remold(self): | |
""" Subtracts the reconstructed residuals and places the cleaned | |
spectra into the duplicated datacube. | |
""" | |
logger.info('Applying correction and reshaping data product') | |
self.cleancube = self.make_cube_from_stack(self.stack - self.recon, | |
with_nans=self.run_clean) | |
logger.info('done') | |
def reprocess(self, nevals=[]): | |
""" A method that redoes the eigenvalue selection, reconstruction, and | |
remolding of the data. | |
""" | |
self.chooseevals(nevals=nevals) | |
self.reconstruct() | |
self.remold() | |
def optimize(self): | |
"""Compute the optimal number of components needed to characterize | |
the residuals. | |
This function calculates the variance per segment with an increasing | |
number of eigenspectra/eigenvalues. It then deterimines the point at | |
which the second derivative of this variance curve reaches zero. When | |
this occurs, the linear reduction in variance is attributable to the | |
removal of astronomical features rather than emission line residuals. | |
""" | |
logger.info('Compute number of components') | |
ncomp = [] | |
for model in self.models: | |
var = model.explained_variance_ | |
deriv, mn1, std1 = _compute_deriv(var) | |
cross = np.append([False], deriv >= (mn1 - std1)) | |
ncomp.append(np.where(cross)[0][0]) | |
self.nevals = np.array(ncomp) | |
def make_contcube(self): | |
""" Remold the continuum array so it can be investigated. | |
Takes the continuum stack and returns it into a familiar cube form. | |
""" | |
contcube = self.cube.copy() * np.nan | |
contcube[:, self.y, self.x] = self.contarray | |
return contcube | |
def _applymask(self, mask): | |
"""Apply a mask to the input data to provide a cleaner basis set. | |
mask is >1 for objects, 0 for sky so that people can use sextractor. | |
The file is read with ``astropy.io.fits.getdata`` which first tries to | |
read the primary extension, then the first extension is no data was | |
found before. | |
""" | |
logger.info('Applying Mask for SVD Calculation from %s', mask) | |
self.maskfile = mask | |
mask = fits.getdata(mask).astype(bool) | |
nmasked = np.count_nonzero(mask) | |
logger.info('Masking %d pixels (%d%%)', nmasked, | |
nmasked / np.prod(mask.shape) * 100) | |
self.cube[:, mask] = np.nan | |
def writecube(self, outcubefits='DATACUBE_ZAP.fits', overwrite=False): | |
"""Write the processed datacube to an individual fits file.""" | |
outhead = _newheader(self) | |
outhdu = fits.PrimaryHDU(data=self.cleancube, header=outhead) | |
write_hdulist_to(outhdu, outcubefits, overwrite=overwrite) | |
logger.info('Cube file saved to %s', outcubefits) | |
def writeskycube(self, skycubefits='SKYCUBE_ZAP.fits', overwrite=False): | |
"""Write the processed datacube to an individual fits file.""" | |
outcube = self.cube - self.cleancube | |
outhead = _newheader(self) | |
outhdu = fits.PrimaryHDU(data=outcube, header=outhead) | |
write_hdulist_to(outhdu, skycubefits, overwrite=overwrite) | |
logger.info('Sky cube file saved to %s', skycubefits) | |
def writevarcurve(self, varcurvefits='VARCURVE_ZAP.fits', overwrite=False): | |
"""Write the explained variance curves to an individual fits file.""" | |
from astropy.table import Table | |
table = Table([m.explained_variance_ for m in self.models]) | |
hdu = fits.table_to_hdu(table) | |
_newheader(self, hdu.header) | |
write_hdulist_to(hdu, varcurvefits, overwrite=overwrite) | |
logger.info('Variance curve file saved to %s', varcurvefits) | |
def mergefits(self, outcubefits, overwrite=False): | |
"""Merge the ZAP cube into the full muse datacube and write.""" | |
# make sure it has the right extension | |
outcubefits = outcubefits.split('.fits')[0] + '.fits' | |
with fits.open(self.musecubefits) as hdu: | |
hdu[1].header = _newheader(self) | |
hdu[1].data = self.cleancube | |
write_hdulist_to(hdu, outcubefits, overwrite=overwrite) | |
logger.info('Cube file saved to %s', outcubefits) | |
def plotvarcurve(self, i=0, ax=None): | |
var = self.models[i].explained_variance_ | |
deriv, mn1, std1 = _compute_deriv(var) | |
if ax is None: | |
import matplotlib.pyplot as plt | |
fig, ax = plt.subplots(3, 1, figsize=[10, 15]) | |
ax1, ax2, ax3 = ax | |
ax1.plot(var, linewidth=3) | |
ax1.plot([self.nevals[i], self.nevals[i]], [min(var), max(var)]) | |
ax1.set_ylabel('Variance') | |
ax2.plot(np.arange(deriv.size), deriv) | |
ax2.hlines([mn1, mn1 - std1], 0, len(deriv), colors=('k', '0.5')) | |
ax2.plot([self.nevals[i] - 1, self.nevals[i] - 1], | |
[min(deriv), max(deriv)]) | |
ax2.set_ylabel('d/dn Var') | |
deriv2 = np.diff(deriv) | |
ax3.plot(np.arange(deriv2.size), np.abs(deriv2)) | |
ax3.plot([self.nevals[i] - 2, self.nevals[i] - 2], | |
[min(deriv2), max(deriv2)]) | |
ax3.set_ylabel('(d^2/dn^2) Var') | |
# ax3.set_xlabel('Number of Components') | |
ax1.set_title('Segment {0}, {1} - {2} Angstroms'.format( | |
i, self.lranges[i][0], self.lranges[i][1])) | |
def plotvarcurves(self): | |
nseg = len(self.models) | |
import matplotlib.pyplot as plt | |
fig, axes = plt.subplots(nseg, 3, figsize=(16, nseg * 2), | |
tight_layout=True) | |
for i in range(nseg): | |
self.plotvarcurve(i=i, ax=axes[i]) | |
# ================= Helper Functions ================= | |
if ASTROPY_LT_1_3: | |
# the 'clobber' parameter was renamed to 'overwrite' in 1.3 | |
def write_hdulist_to(hdulist, fileobj, overwrite=False, **kwargs): | |
hdulist.writeto(fileobj, clobber=overwrite, **kwargs) | |
else: | |
def write_hdulist_to(hdulist, fileobj, overwrite=False, **kwargs): | |
hdulist.writeto(fileobj, overwrite=overwrite, **kwargs) | |
def worker(f, i, chunk, out_q, err_q, kwargs): | |
try: | |
result = f(i, chunk, **kwargs) | |
except Exception as e: | |
err_q.put(e) | |
return | |
# output the result and task ID to output queue | |
out_q.put((i, result)) | |
def parallel_map(func, arr, indices, **kwargs): | |
logger.debug('Running function %s with indices: %s', | |
func.__name__, indices) | |
axis = kwargs.pop('axis', None) | |
if isinstance(indices, (int, np.integer)) and indices == 1: | |
return [func(0, arr, **kwargs)] | |
manager = Manager() | |
out_q = manager.Queue() | |
err_q = manager.Queue() | |
jobs = [] | |
chunks = np.array_split(arr, indices, axis=axis) | |
if 'split_arrays' in kwargs: | |
split_arrays = [np.array_split(a, indices, axis=axis) | |
for a in kwargs.pop('split_arrays')] | |
else: | |
split_arrays = None | |
for i, chunk in enumerate(chunks): | |
if split_arrays: | |
kwargs['split_arrays'] = [s[i] for s in split_arrays] | |
p = Process(target=worker, args=(func, i, chunk, out_q, err_q, kwargs)) | |
jobs.append(p) | |
p.start() | |
# gather the results | |
for proc in jobs: | |
proc.join() | |
if not err_q.empty(): | |
# kill all on any exception from any one slave | |
logger.info(err_q.get()) | |
raise err_q.get() | |
# Processes finish in arbitrary order. Process IDs double | |
# as index in the resultant array. | |
results = [None] * len(jobs) | |
while not out_q.empty(): | |
idx, result = out_q.get() | |
results[idx] = result | |
return results | |
def _compute_deriv(arr, nsigma=5): | |
"""Compute statistics on the derivatives""" | |
npix = int(0.25 * arr.shape[0]) | |
deriv = np.diff(arr[:npix]) | |
ind = int(.15 * deriv.size) | |
mn1 = deriv[ind:].mean() | |
std1 = deriv[ind:].std() * nsigma | |
return deriv, mn1, std1 | |
def _continuumfilter(stack, cftype, weight=None, cfwidth=300): | |
if cftype == 'fit': | |
x = np.arange(stack.shape[0]) | |
# Excluding the very red part for the fit. This is Muse-specific, | |
# but anyway for another instrument this method should probably | |
# not be used as is. | |
res = np.polynomial.polynomial.polyfit(x[:3600], stack[:3600], deg=5) | |
ret = np.polynomial.polynomial.polyval(x, res, tensor=True) | |
return ret.T | |
if cftype == 'median': | |
func = _icfmedian | |
weight = None | |
elif cftype == 'weight': | |
func = _icfweight | |
logger.info('Using cfwidth=%d', cfwidth) | |
c = parallel_map(func, stack, NCPU, axis=1, weight=weight, cfwidth=cfwidth) | |
return np.concatenate(c, axis=1) | |
def _icfweight(i, stack, weight=None, cfwidth=None): | |
return np.array([wmedian(row, weight, cfwidth=cfwidth) | |
for row in stack.T]).T | |
def _icfmedian(i, stack, weight=None, cfwidth=None): | |
ufilt = 3 # set this to help with extreme over/under corrections | |
return ndi.median_filter( | |
ndi.uniform_filter(stack, (ufilt, 1)), (cfwidth, 1)) | |
def rolling_window(a, window): # function for striding to help speed up | |
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window) | |
strides = a.strides + (a.strides[-1],) | |
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides) | |
def wmedian(spec, wt, cfwidth=300): | |
"""Performs a weighted median filtering of a 1d spectrum. | |
Operates using a cumulative sum curve. | |
Parameters | |
---------- | |
spec : numpy.ndarray | |
Input 1d spectrum to be filtered | |
wt : numpy.ndarray | |
A spectrum of equal length as the input array to provide the weights. | |
cfwidth : int or float | |
Window size for the continuum filter, for the SVD computation. | |
Default to 300. | |
""" | |
# ignore the warning (feature not a bug) | |
old_settings = np.seterr(divide='ignore') | |
spec = np.pad(spec, (cfwidth, cfwidth), 'constant', constant_values=0) | |
wt = np.abs(wt) | |
wt = np.pad(wt, (cfwidth, cfwidth), 'constant', | |
constant_values=(np.min(wt) / 1000., np.min(wt) / 1000.)) | |
# do some striding for speed | |
swin = rolling_window(spec, cfwidth) # create window container array | |
wwin = rolling_window(wt, cfwidth) # create window container array | |
# sort based on data | |
srt = np.argsort(swin, axis=-1) | |
ind = np.ogrid[0:swin.shape[0], 0:swin.shape[1]] | |
sdata = swin[ind[0], srt] | |
swt = wwin[ind[0], srt] | |
# calculate accumulated weights | |
awt = np.cumsum(swt, axis=-1) | |
# new weightsort for normalization and consideration of data | |
nw = (awt - 0.5 * swt) / awt[:, -1][:, np.newaxis] | |
# find the midpoint in the new weight sort | |
s = np.argmin(np.abs(nw - 0.5), axis=-1) | |
sl = np.arange(len(s)) | |
nws = nw[sl, s] | |
nws1 = nw[sl, s - 1] | |
f1 = (nws - 0.5) / (nws - nws1) | |
f2 = (0.5 - nws1) / (nws - nws1) | |
wmed = sdata[sl, s - 1] * f1 + sdata[sl, s] * f2 | |
width = cfwidth // 2 | |
wmed = wmed[width:-width - 1] | |
np.seterr(old_settings['divide']) | |
return wmed | |
def _newheader(zobj, header=None): | |
"""Put the pertinent zap parameters into the header""" | |
header = header or zobj.header.copy() | |
header['COMMENT'] = 'These data have been ZAPped!' | |
header.append(('ZAPvers', __version__, 'ZAP version'), end=True) | |
# zlevel removal performed | |
header.append(('ZAPzlvl', zobj.run_zlevel, 'ZAP zero level correction')) | |
# Nanclean performed | |
header['ZAPclean'] = (zobj.run_clean, | |
'ZAP NaN cleaning performed for calculation') | |
# Continuum Filtering | |
header['ZAPcftyp'] = (zobj._cftype, 'ZAP continuum filter type') | |
header['ZAPcfwid'] = (zobj._cfwidth, 'ZAP continuum filter size') | |
# number of segments | |
nseg = len(zobj.pranges) | |
header['ZAPnseg'] = (nseg, 'Number of segments used for ZAP SVD') | |
# per segment variables | |
if hasattr(zobj, 'nevals'): | |
for i in range(nseg): | |
header['ZAPseg{}'.format(i)] = ( | |
'{}:{}'.format(zobj.pranges[i][0], zobj.pranges[i][1] - 1), | |
'spectrum segment (pixels)') | |
header['ZAPnev{}'.format(i)] = (zobj.nevals[i], | |
'number of eigenvals/spectra used') | |
return header | |
def _isigclip(i, istack): | |
mn = [] | |
for col in istack: | |
clipped, bot, top = sigmaclip(col, low=3, high=3) | |
mn.append(clipped.mean()) | |
return np.array(mn) | |
def _imedian(i, istack): | |
return np.median(istack, axis=1) | |
@timeit | |
def _nanclean(cube, rejectratio=0.25, boxsz=1): | |
""" | |
Detects NaN values in cube and removes them by replacing them with an | |
interpolation of the nearest neighbors in the data cube. The positions in | |
the cube are retained in nancube for later remasking. | |
""" | |
logger.info('Cleaning NaN values in the cube') | |
cleancube = cube.copy() | |
badcube = np.logical_not(np.isfinite(cleancube)) # find NaNs | |
badmap = badcube.sum(axis=0) # map of total nans in a spaxel | |
# choose some maximum number of bad pixels in the spaxel and extract | |
# positions | |
badmask = badmap > (rejectratio * cleancube.shape[0]) | |
logger.info('Rejected %d spaxels with more than %.1f%% NaN pixels', | |
np.count_nonzero(badmask), rejectratio * 100) | |
# make cube mask of bad spaxels | |
badcube &= (~badmask[np.newaxis, :, :]) | |
z, y, x = np.where(badcube) | |
neighbor = np.zeros((z.size, (2 * boxsz + 1)**3)) | |
icounter = 0 | |
logger.info("Fixing %d remaining NaN pixels", len(z)) | |
# loop over samplecubes | |
nz, ny, nx = cleancube.shape | |
for j in range(-boxsz, boxsz + 1, 1): | |
for k in range(-boxsz, boxsz + 1, 1): | |
for l in range(-boxsz, boxsz + 1, 1): | |
iz, iy, ix = z + l, y + k, x + j | |
outsider = ((ix <= 0) | (ix >= nx - 1) | | |
(iy <= 0) | (iy >= ny - 1) | | |
(iz <= 0) | (iz >= nz - 1)) | |
ins = ~outsider | |
neighbor[ins, icounter] = cleancube[iz[ins], iy[ins], ix[ins]] | |
neighbor[outsider, icounter] = np.nan | |
icounter = icounter + 1 | |
cleancube[z, y, x] = np.nanmean(neighbor, axis=1) | |
return cleancube, badcube | |
except: | |
failure=1 | |
class MuseZap(esorexplugin.RecipePlugin): | |
# The recipe must be given a name by setting the 'name' class attribute. | |
name = "muse_zap" | |
# A version needs to be assigned. For this it is best to use the | |
# VersionNumber method that will correctly encode a major, minor and micro | |
# version sub-numbers. NOTE: minor and micro must be in the range [0..99]. | |
version = VersionNumber(2, 0, 0) | |
# A synopsis string should be given for the recipe for the man page created | |
# by EsoRex. | |
# NOTE: If the following class attribute is not provided, the first line of | |
# this class's doc string will be used for the synopsis instead. | |
synopsis = "This recipe performs the ZAP v2.1 algorithm to remove the residual sky lines in MUSE datacubes." | |
description = ("This recipe performs the ZAP v2.1 algorithm to remove the residual sky lines in MUSE datacubes\n" | |
"It is part of a series of contributed recipes and it is meant to be used within the muse_zap \n" | |
"ESOreflex workflow. It is not part of the official MUSE pipeline.\n" | |
"For a proper description of the ZAP algorithm, we refer to the article Soto et al. 2016, MNRAS, 458, 3210S.\n" | |
"For any question related to this recipe, please contact sdp_muse@eso.org.\n") | |
author = "Lodovico Coccato" | |
email = "sdp_muse@eso.org" | |
copyright = "Copyright 2017" | |
names=["zlevel", | |
"cftype", | |
"cfwidthSVD", | |
"cfwidthSP", | |
"nevals", | |
"clean"]#, | |
# "test"] | |
# "extSVD"] | |
alias=["zlevel", | |
"cftype", | |
"cfwidthSVD", | |
"cfwidthSP", | |
"nevals", | |
"clean"]#, | |
# "test"] | |
# "extSVD"] | |
fullnames= map(lambda x: "muse_zap.muse_zap."+x, names) | |
desc1="Method for the zeroth order sky removal: 'none', 'sigclip' or 'median' (default)." | |
desc2="Method for the continuum filter: 'median' or 'weight' (default). For the 'weight' method, a zeroth order sky is required (see 'zlevel')." | |
desc3="Window size for the continuum filter, for the SVD computation. Default to 500." | |
desc4= "Window size for the continuum filter used to remove the continuum features for calculating the eigenvalues per spectrum. Smaller values better trace the sources. An optimal range of is typically 20 - 50 pixels. Default to 30." | |
#Provide either a single value | |
#that will be used for all of the segments, or a list of 11 values that | |
#will be used for each of the segments." | |
desc7= "Allow to specify the number of eigenspectra used for each segment. Ignored if pevals is used or optimizetType different from None." | |
#Provide either a single value | |
#that will be used for all of the segments, or a list of 11 values that | |
#will be used for each of the segments. | |
desc8= "If True (default value), the NaN values are cleaned. Spaxels with more then 25% of NaN values are removed, the others are replaced with an interpolation from the neighbors. The NaN values are reinserted into the final datacube. If set to False, any spaxel with a NaN value will be ignored." | |
# desc9="Path of an input FITS file containing a single value decomposition (SVD) previously computed and to be used on the current object. If not provided, the SVD are computed from the imput frame" | |
# desc_test="test parameter" | |
parameters = [ | |
EnumParameter(fullnames[0],'median', ['median', 'none', 'sigclip'],cli_alias=alias[0],cli_enabled=True,description=desc1), | |
EnumParameter(fullnames[1],'weight', ['weight', 'median'],cli_alias=alias[1],cli_enabled=True,description=desc2), | |
ValueParameter(fullnames[2],500,cli_alias=alias[2],cli_enabled=True, description=desc3), | |
ValueParameter(fullnames[3],30,cli_alias=alias[3],cli_enabled=True, description=desc4), | |
ValueParameter(fullnames[4],-1 ,cli_alias=alias[4],cli_enabled=True, description=desc7), | |
ValueParameter(fullnames[5],True,cli_alias=alias[5],cli_enabled=True, description=desc8)] | |
# EnumParameter(fullnames[6],1.0,[0.,1.0,10.],cli_alias=alias[6],cli_enabled=True, description=desc_test)] | |
def set_frame_group(self, frame): | |
if frame.tag == 'RAW': | |
frame.group = Frame.GROUP_RAW | |
else: | |
frame.group = Frame.GROUP_CALIB | |
frame.level = Frame.LEVEL_FINAL | |
def process(self,frames,zlevel,cftype,cfwidthSVD,cfwidthSP,nevals,clean):#zlevel,cftype,cfwidthSVD,cfwidthSP,optimizeType,pevals,nevals,clean,extSVD): | |
#if __name__ == '__main__': | |
warnings.filterwarnings("ignore") | |
output_frames = [] | |
values=[zlevel,cftype,cfwidthSVD,cfwidthSP,nevals,clean] | |
mask_cube_filename=None | |
mask_sky_filename=None | |
IsThereSkyCube = 0 | |
IsThereObjectCube = 0 | |
for j in range(0,len(frames)): | |
if frames[j].tag == 'MASK_FINAL_CUBE': | |
mask_filename = frames[j].filename | |
if frames[j].tag == 'DATACUBE_SKY': | |
IsThereSkyCube = 1 | |
muse_sky_cubefits_filename = frames[j].filename | |
if frames[j].tag == 'DATACUBE_OBJECT': | |
IsThereObjectCube = 1 | |
muse_object_cubefits_filename = frames[j].filename | |
outcubefits_filename = 'DATACUBE_CLEANED.fits' | |
if IsThereSkyCube ==1: | |
IsThereObjectCube = 0 | |
extSVD = SVDoutput(muse_sky_cubefits_filename, mask=mask_filename, | |
clean=clean, zlevel=zlevel, cftype=cftype, | |
cfwidth=cfwidthSVD,ncpu=NCPU) | |
try: | |
if nevals == -1: | |
nevals = [] | |
processzap(muse_object_cubefits_filename, outcubefits=outcubefits_filename, | |
clean=clean,zlevel=zlevel,cftype=cftype, cfwidthSVD=cfwidthSVD, | |
cfwidthSP=cfwidthSP, nevals=nevals,ncpu=NCPU, | |
extSVD=extSVD, skycubefits=None, interactive=False) | |
except: | |
self.raise_error(' execution error, check if products already exist') | |
if IsThereObjectCube ==1: | |
try: | |
if nevals == -1: | |
nevals = [] | |
processzap(muse_object_cubefits_filename, outcubefits=outcubefits_filename,clean=clean,zlevel=zlevel, | |
cftype=cftype, cfwidthSVD=cfwidthSVD, cfwidthSP=cfwidthSP, nevals=nevals,ncpu=NCPU, | |
mask=mask_filename, interactive=False) | |
except: | |
self.raise_error(' execution error, check if products already exist') | |
hdu = fits.open(outcubefits_filename,method='update') | |
hdu[0].header['HIERARCH ESO PRO CATG'] = 'DATACUBE_CLEANED' | |
if IsThereSkyCube ==1: | |
s1=muse_sky_cubefits_filename.split("/")[-1] | |
s2=mask_filename.split("/")[-1] | |
s1=s1[:50] | |
s2=s2[:50] | |
hdu[0].header['HIERARCH ESO SKYCUBE'] = s1 | |
hdu[0].header['HIERARCH ESO SKYCUBE MASK'] = s2 | |
else: | |
hdu[0].header['HIERARCH ESO SKYCUBE'] = 'None' | |
hdu[0].header['HIERARCH ESO SKYCUBE MASK'] = 'None' | |
if IsThereObjectCube ==1: | |
s3 = mask_filename.split("/")[-1] | |
s3=s3[:50] | |
hdu[0].header['HIERARCH ESO OBJECTCUBE MASK'] = s3 | |
else: | |
hdu[0].header['HIERARCH ESO OBJECTCUBE MASK'] = 'None' | |
for k in range(len(self.input_parameters)): | |
hkey1 = 'HIERARCH ESO ZAP REC'+str(k+1)+' NAME' | |
hkey2 = 'HIERARCH ESO ZAPn REC'+str(k+1)+' VALUE' | |
hdu[0].header[hkey1] = self.alias[k] | |
hdu[0].header[hkey2] = values[k] | |
outcubefits = Frame(outcubefits_filename,'DATACUBE_CLEANED', type = Frame.TYPE_IMAGE) | |
output_frames.append(outcubefits) | |
outcubefits.write(hdu,overwrite = True, output_verify = 'ignore',checksum=True) | |
return output_frames | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment