Skip to content

Instantly share code, notes, and snippets.

@TallJimbo
Last active April 28, 2023 16:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TallJimbo/17f65393ae8adaf973ea81388008aff8 to your computer and use it in GitHub Desktop.
Save TallJimbo/17f65393ae8adaf973ea81388008aff8 to your computer and use it in GitHub Desktop.
Snap Combination
from lsst.afw.image import Exposure, ExposureInfo, MaskedImage, Image
from lsst.afw.table import SourceCatalog
from lsst.afw.detection import Psf
from lsst.afw.math import BackgroundList
from lsst.pex.config import Config, ConfigurableField
from lsst.pipe.base import PipelineTaskConnections, PipelineTaskConfig, PipelineTask, Struct, Task
from lsst.pipe.base.connectionTypes import Input, Output
class CharacterizeImageConnections(PipelineTaskConnections, dimensions=("visit", "detector")):
exposure = Input(
doc="Input single-snap exposure images",
name="postISRCCD",
storageClass="Exposure",
dimensions=("exposure", "detector"),
multiple=True,
)
snap_diff = Output(
doc=(
"Difference between input snaps. Will only be produced if exactly "
"two 'exposure' datasets are provided. Some later task will read "
"this and do forced photometry on it using positions from the "
"final DIASource catalog. Only the image is saved, since "
"everything else should be the same as in the final PVI."
),
name="snap_diff"
storageClass="Image",
dimensions=("visit, detector"),
)
snap_diff_background = Output(
doc="Background subtracted from the `snap_diff` image.",
name="snap_diff_backgound",
storageClass="Background",
dimensions=("visit, detector"),
)
...
def adjustQuantum(self, inputs, outputs, label, dataId):
# Docstring inherited from PipelineTaskConnections
exposure_connection, exposure_refs = inputs["exposure"]
if not len(exposure_refs):
raise NoWorkFound("No images to process.")
if len(exposure_refs) not in (1, 2):
raise ValueError("Exactly one or two input snaps required.")
return super().adjustQuantum(inputs, outputs, label, dataId)
class CharacterizeImageConfig(PipelineTaskConfig, pipelineConnections=CharacterizeImageConnections):
combineSnaps = ConfigurableField(
target=CombineSnapsTask,
doc="Configuration for combining pairs of input exposures.",
)
...
class CharacterizeImageTask(PipelineTask):
ConfigClass = CharacterizeImageTaskConfig
...
def __init__(self, ...):
...
self.makeSubtask("combineSnaps")
def run(self, exposures: list[Exposure], ...):
result = self.combineSnaps(exposures)
initial_catalog = self._run_some_processing_and_fit_psf(result.visit_image, ...)
self._find_and_fix_cosmic_rays(result.visit_image, result.snap_diff)
catalog = self._run_some_more_processing(result.visit_image, initial_catalog)
return Struct(
characterized=result.visit_image,
catalog=catalog,
snap_diff=result.snap_diff,
snap_diff_background=result.snap_diff_background,
...
)
def _run_some_processing_and_fit_psf(self, exposure: Exposure, ...) -> SourceCatalog:
"""The first bit of the current CharacterizeImageTask, where we
run detection, measurement, and PSF fitting N times.
On return ``exposure.getPsf`` will return a new PSF model.
"""
...
def _find_and_fix_cosmic_rays(self, exposure: Exposure, snap_diff: Image | None, ...) -> None:
"""A modified version of a call to RepairTask that do something with
a snap-diff image if one is provided.
On return all CRs in ``exposure`` have been interpolated and masked.
"""
...
def _run_some_more_processing(self, exposure: Exposure, catalog: SourceCatalog, ...) -> SourceCatalog:
"""The last bit of the current CharacterizeImageTask, where we
re-measure everything and compute and apply aperture corrections.
On return ``exposure.getApCorrMap`` will return a new aperture
correction map.
"""
...
class CombineSnapsConfig(Config):
...
class CombineSnapsTask(Task):
ConfigClass = CombineSnapsConfig
def run(self, exposures: list[Exposure], psf: Psf) -> Struct:
if len(exposures) > 1:
assert len(exposures) == 2
diff = exposures[0].image - exposures[1].image
else:
combined = exposures[0]
diff = None
diff_background = None
return Struct(visit_image=combined, snap_diff=diff, snap_diff_background=diff_background)
def _repair_and_combine_pair(self, a: Exposure, b: Exposure, psf: Psf) -> Struct:
"""Process a pair of snaps.
This includes:
- subtracting them to look for garbage (CRs, streaks, saturation) in
the diff, using the given PSF;
- masking and interpolating that garbage in both input images and the
diff
- averaging the images together (mostly weighted by exposure time, but
ignoring pixels with certain mask bits set in only one of the pair).
Returns a struct with:
visit_image : `Exposure`, average after garbage-repair
snap_diff : `Image`, image plane of difference
"""
...
def _repair_and_return_single(self, a: Exposure, b: Exposure) -> Struct:
"""Subtract the images planes for two exposure objects after
dividing by their exposure times, and then subtract the background in
the diff.
"""
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment