-
-
Save SimonKrughoff/12b8bbb2e17a6cfe1b8cc4397d293410 to your computer and use it in GitHub Desktop.
crosstalk rework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This file is part of obs_decam. | |
# | |
# Developed for the LSST Data Management System. | |
# This product includes software developed by the LSST Project | |
# (http://www.lsst.org). | |
# Copyright 2018 LSST Corporation. | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
# | |
"""Apply crosstalk corrections to raw DECam images. | |
Some of this code is based on DECam_crosstalk.py, used for the THELI pipeline | |
and written by Thomas Erben (private communication, 2018) | |
""" | |
from __future__ import absolute_import, division, print_function | |
from collections import defaultdict | |
from astropy.io import fits | |
import datetime as dt | |
import os | |
import argparse | |
import re | |
import numpy as np | |
import lsst.log | |
import lsst.pipe.base as pipeBase | |
from lsst.ip.isr import CrosstalkConfig, CrosstalkTask | |
from lsst.obs.decam import DecamMapper | |
from lsst.utils import getPackageDir | |
import lsst.afw.math as afwMath | |
__all__ = ["DecamCrosstalkTask"] | |
class DecamCrosstalkConfig(CrosstalkConfig): | |
"""Configuration for DECam crosstalk removal. | |
""" | |
def getSourcesAndCoeffsFile(self, filename='DECam_xtalk_20130606.txt'): | |
"""Return directory containing the DECam_xtalk_20130606.txt file. | |
File containing DECam crosstalk coefficients. This text file is | |
provided by NOAO in a particular format with information | |
about the DECam crosstalk coefficients. It is available at | |
http://www.ctio.noao.edu/noao/content/DECam-Calibration-Files | |
""" | |
mapper = DecamMapper() | |
packageName = mapper.getPackageName() | |
packageDir = getPackageDir(packageName) | |
return os.path.join(packageDir, 'decam', filename) | |
def getSourcesAndCoeffs(self): | |
"""Read crosstalk sources and coefficients from DECam-specific file | |
""" | |
crosstalk_file = self.getSourcesAndCoeffsFile() | |
sources = defaultdict(list) | |
coeffs = {} | |
log = lsst.log.Log.getLogger('obs.decam.DecamCrosstalkConfig') | |
log.info('Reading crosstalk coefficient data') | |
with open(crosstalk_file) as f: | |
for line in f: | |
li = line.strip() | |
if not li.startswith('#'): | |
elem = li.split() | |
# The xtalk file has image areas like 'ccd01A'; preserve only '01A' | |
sources[elem[0][3:]].append(elem[1][3:]) | |
# The linear crosstalk coefficients | |
coeffs[(elem[0][3:], elem[1][3:])] = float(elem[2]) | |
return sources, coeffs | |
class DecamCrosstalkTask(CrosstalkTask): | |
"""Remove crosstalk from a raw DECam image. | |
This must be done after the overscan is corrected but before it is trimmed. | |
This version of crosstalk removal assumes you want to do it as part of a | |
regular stack-ISR workflow. | |
""" | |
ConfigClass = DecamCrosstalkConfig | |
_DefaultName = 'decamCrosstalk' | |
def map_name_str(name): | |
Do something (maybe using a mapping dict) to change the name in the detector to whatever you need for ccdnum_str so you don't have to get it from the dataId. | |
def prepCrosstalk(dataRef): | |
sources, coeffs = self.config.getSourcesAndCoeffs() | |
# Define a butler to prepare for accessing crosstalk 'source' image data | |
try: | |
butler = dataRef.getButler() | |
except RuntimeError: | |
log.fatal('Cannot get a Butler from the dataRef provided') | |
det = exposure.getDetector() | |
# Retrieve visit and ccdnum from 'victim' so we can look up 'sources' | |
visit = dataRef.dataId['visit'] | |
ccdnum_str = map_name_str(det.getName()) | |
exposure = dataRef.get() | |
source_data = defaultdict(list) | |
for amp in exposure.getDetector(): | |
victim = ccdnum_str + amp.getName() | |
for source in sources[victim]: | |
source_ccdnum = int(source[:2]) | |
try: | |
source_exposure = butler.get('raw', dataId={'visit': visit, 'ccdnum': source_ccdnum}) | |
except RuntimeError: | |
log.warn('Cannot access source %d, SKIPPING IT' % source_ccdnum) | |
else: | |
log.info('Correcting victim %s from crosstalk source %s' % (victim, source)) | |
if 'A' in source: | |
amp_idx = 0 | |
elif 'B' in source: | |
amp_idx = 1 | |
else: | |
log.fatal('DECam source amp name does not contain A or B, cannot proceed') | |
source_amp = source_exposure.getDetector()[amp_idx] | |
source_dataBBox = source_amp.getRawDataBBox() | |
# Check to see if source overscan has been corrected, and if not, correct it first | |
if not source_exposure.getMetadata().exists('OVERSCAN'): | |
from .isr import DecamIsrTask | |
decamisr = DecamIsrTask() | |
decamisr.overscanCorrection(source_exposure, source_amp) | |
log.info('Correcting source %s overscan before using to correct crosstalk' % source) | |
source_image = source_exposure.getMaskedImage().getImage() | |
#### As mentioned below. This should probably be a deep copy. | |
source_data = source_image.Factory(source_image, source_dataBBox, True) | |
sources[victim].append(source_data, coeffs[(victim, source)]) | |
return sources | |
@pipeBase.timeMethod | |
def run(self, exposure, sources): | |
"""Perform crosstalk correction on a DECam exposure and its corresponding dataRef. | |
Parameters | |
---------- | |
exposure : `lsst.afw.image.Exposure` | |
Exposure to correct | |
sources : `defaultdict | |
A dictionary of a list of tuples (source_data, source_coeff) for each victim | |
Returns | |
------- | |
`lsst.pipe.base.Struct` | |
Struct with components: | |
- ``exposure``: The exposure after crosstalk correction has been | |
applied (`lsst.afw.image.Exposure`). | |
""" | |
self.log.info('Applying crosstalk correction') | |
log = lsst.log.Log.getLogger('obs.decam.subtractCrosstalk') | |
# Load data from crosstalk 'victim' exposure we want to correct | |
det = exposure.getDetector() | |
for amp in det: | |
victim = map_name_str(det.getName() + amp.getName() | |
dataBBox = amp.getRawDataBBox() | |
# Check to see if victim overscan has been corrected, and if not, correct it first | |
if not exposure.getMetadata().exists('OVERSCAN'): | |
from .isr import DecamIsrTask | |
decamisr = DecamIsrTask() | |
decamisr.overscanCorrection(exposure, amp) | |
log.warn('Overscan correction did not happen prior to crosstalk correction') | |
log.info('Correcting victim %s overscan before crosstalk' % victim) | |
image = exposure.getMaskedImage().getImage() | |
victim_data = image.Factory(image, dataBBox) | |
# Load data from crosstalk 'source' exposures | |
for source in sources[victim]: | |
source_data, source_coeff = source | |
if victim_data.getX0() != source_data.getX0(): | |
# Occurs with amp A and B mismatch; need to flip horizontally | |
source_data = afwMath.flipImage(source_data, flipLR=True, flipTB=False) | |
# Perform the linear crosstalk correction | |
try: | |
## CAREFUL, this is modifying a view in place, so if the same section is in multiple entries | |
## the creating script should deep copy | |
source_data *= source_coeff | |
victim_data -= source_data | |
except RuntimeError: | |
log.fatal('Crosstalk correction failed for victim %s from source %s' | |
% (victim, source)) | |
return pipeBase.Struct( | |
exposure=exposure, | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment