Skip to content

Instantly share code, notes, and snippets.

@eteq
Created September 23, 2014 22:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eteq/87845710d1f2da02d8c5 to your computer and use it in GitHub Desktop.
Save eteq/87845710d1f2da02d8c5 to your computer and use it in GitHub Desktop.
Mock astronomy image generation code
from __future__ import division, print_function
import abc
import numpy as np
from matplotlib import pyplot as plt
from astropy import units as u
from astropy.modeling import Fittable2DModel, Parameter, fitting
from astropy.modeling import format_input as _format_input
maggie = u.def_unit('maggie', 3631*u.Jy)
nmaggie = u.def_unit('nmaggie', 3631e-9*u.Jy)
TWOPI = 2 * np.pi
class MockImage(object):
"""
Creates mock images to test photometry routines on
"""
def __init__(self, **kwargs):
kwargs.setdefault('verbose', False)
kwargs.setdefault('xsize', 512)
kwargs.setdefault('ysize', 512)
# SDSS r band from Fukugita+ 96
kwargs.setdefault('filter_cen', 6261*u.angstrom)
kwargs.setdefault('filter_fwhm', 1382*u.angstrom)
kwargs.setdefault('effective_area', 1 * u.m**2)
kwargs.setdefault('quantum_efficiency', 1 * u.electron / u.photon)
kwargs.setdefault('gain', 1 * u.electron / u.adu)
kwargs.setdefault('read_noise', 0 * u.electron)
kwargs.setdefault('saturation', None)
kwargs.setdefault('bias', 0 * u.adu)
kwargs.setdefault('exp_time', 1 * u.s)
kwargs.setdefault('psf_model',MoffatPSFModel(alpha_x=5, alpha_y=5))
kwargs.setdefault('background', {'type': 'constant', 'value': 1.*nmaggie})
kwargs.setdefault('starlocs', [[], []])
if 'starmags' in kwargs and 'starfluxs' in kwargs:
raise ValueError('cannot give both starmags and starfluxs')
elif 'starmags' in kwargs:
self.starmags = np.array(kwargs.pop('starmags'))
elif 'starfluxs' in kwargs:
self.starfluxs = np.array(kwargs.pop('starfluxs'))
else:
self.starfluxs = np.array([])
kwargs.setdefault('gallocs', [[], []])
kwargs.setdefault('galsizes', [[], []])
kwargs.setdefault('galpas', [])
kwargs.setdefault('galns', [])
kwargs.setdefault('galconvwpsf', False)
if 'galmags' in kwargs and 'galfluxs' in kwargs:
raise ValueError('cannot give both galmags and galfluxs')
elif 'galmags' in kwargs:
self.galmags = np.array(kwargs.pop('galmags'))
elif 'galfluxs' in kwargs:
self.galfluxs = np.array(kwargs.pop('galfluxs'))
else:
self.galfluxs = np.array([])
for nm in kwargs:
setattr(self, nm, kwargs[nm])
self.starfluxsimg = None
self.galfluximg = None
self.bkgfluximg = None
self.artifactfluximg = None
self.fluximg = None
@property
def starmags(self):
return -2.5*np.log10(self.starfluxs.to(maggie).value)
@starmags.setter
def starmags(self, value):
self.starfluxs = maggie*10**(value/-2.5)
@property
def galmags(self):
return -2.5*np.log10(self.galfluxs.to(maggie).value)
@galmags.setter
def galmags(self, value):
self.galfluxs = maggie*10**(value/-2.5)
@property
def xsize(self):
return self._xsize
@xsize.setter
def xsize(self, value):
self._xsize = value
self._baseimg = self._coords = None
@property
def ysize(self):
return self._ysize
@ysize.setter
def ysize(self, value):
self._ysize = value
self._baseimg = self._coords = None
@property
def coords(self):
if self._coords is None:
x = np.arange(self.xsize)
y = np.arange(self.ysize)
x, y = np.meshgrid(x, y)
self._coords = (x.T, y.T)
return self._coords
@property
def baseimg(self):
if self._baseimg is None:
self._baseimg = np.ones((self.xsize, self.ysize))
return self._baseimg
def _print(self, *args, **kwargs):
if self.verbose:
print(*args, **kwargs)
sys.stdout.flush()
def create_stars_flux(self):
xs, ys = self.starlocs
flxs = self.starfluxs
if len(flxs) != len(xs):
raise ValueError('number of star fluxes must match locations')
starimg = 0*nmaggie * self.baseimg
mod = self.psf_model.copy()
mod.flux = 1
for i, (x, y, flx) in enumerate(zip(xs, ys, flxs)):
self._print('\rOn star {i} of {nstars}'.format(i=i+1, nstars=len(xs)), end='')
mod.cen_x = x
mod.cen_y = y
starimg += flx * mod(*self.coords)
self._print('')
return starimg
def create_galaxies_flux(self):
from astropy import convolution
xs, ys = self.gallocs
flxs = self.galfluxs
rxs, rys = self.galsizes
pas = self.galpas
ns = self.galns
mod = GalaxyModel()
galimg = 0*nmaggie * self.baseimg
zpars = zip(xs, ys, flxs, rxs, rys, pas, ns)
for i, (x, y, flx, rx, ry, pa, n) in enumerate(zpars):
self._print('\rOn galaxy {i} of {nstars}'.format(i=i+1, nstars=len(xs)), end='')
mod.cen_x = x
mod.cen_y = y
mod.re_x = rx
mod.re_y = ry
mod.pa = pa
mod.n = n
galimg += flx * mod(*self.coords)
self._print('')
if self.galconvwpsf and (len(xs) > 0):
self._print("Convolving gal flux w/PSF")
xsz = (self.psf_model.scale_x*-2 - 0.5, self.psf_model.scale_x*2 + 0.5)
ysz = (self.psf_model.scale_y*-2 - 0.5, self.psf_model.scale_y*2 + 0.5)
ckernel = convolution.discretize_model(self.psf_model, xsz, ysz)
cgalimg = convolution.convolve_fft(galimg.value, ckernel)
#conserve flux! also preserve the unit val `galimg`
fluxratio = np.sum(galimg) / np.sum(cgalimg)
galimg = fluxratio * cgalimg
return galimg
def create_background_flux(self):
if self.background['type'] == 'constant':
return self.baseimg * self.background['value']
else:
raise ValueError('Unrecognized background type:' + str(self.background['type']))
def create_artifacts_flux(self):
from warnings import warn
warn('artifacts not yet implemented')
return 0*nmaggie * self.baseimg
def generate_flux_image(self):
self.starfluximg = self.create_stars_flux()
self.galfluximg = self.create_galaxies_flux()
self.bkgfluximg = self.create_background_flux()
self.artifactfluximg = self.create_artifacts_flux()
self.fluximg = self.starfluximg + self.galfluximg + self.bkgfluximg + self.artifactfluximg
return self.fluximg
def convert_to_counts(self, img=None, subtract_bkg=False, poisson=True):
"""
Parameters
----------
img
The flux image to convert or None to use ``self.fluximg``
subtract_bkg : True, False, or 'smooth'
If True, subtract the background noisly (with a new realization of
the background). If 'smooth', use a smooth background.
random
Returns
-------
countimg
The image in ADUs (integer if `poisson` is True)
"""
from astropy.constants import h
if img is None:
img = self.fluximg
# not exactly right for non-zero width, but close enough?
cenfreq = self.filter_cen.to(u.Hz, u.spectral())
freq1 = (self.filter_cen - self.filter_fwhm/2).to(u.Hz, u.spectral())
freq2 = (self.filter_cen + self.filter_fwhm/2).to(u.Hz, u.spectral())
bandwidthfreq = np.ptp([freq1.value, freq2.value]) * u.Hz
ph_to_en = u.photon / (h * cenfreq)
photonimg = (ph_to_en * img * bandwidthfreq * self.exp_time *
self.effective_area).to(u.photon)
electronimg = photonimg * self.quantum_efficiency
#now apply read noise
if self.read_noise.value:
rnoise = np.random.randn(self.xsize, self.ysize) * self.read_noise
electronimg = electronimg + rnoise
self.countimg = aduimg = (electronimg / self.gain).to(u.adu)
if self.saturation is not None:
self.last_saturated = aduimg > self.saturation
aduimg[self.last_saturated] = self.saturation
if poisson:
res = (np.random.poisson(aduimg.value) * u.adu + self.bias).astype(int)
else:
res = aduimg.value * u.adu + self.bias
if subtract_bkg:
oldsat = self.saturation
try:
self.saturation = None
if subtract_bkg == 'smooth':
return res - self.convert_to_counts(self.bkgfluximg, poisson=False)
else:
return res - self.convert_to_counts(self.bkgfluximg, poisson=True)
finally:
self.saturation = oldsat
else:
return res
def find_psf_stars(self, imgmsk=None, magcut=None, isodistance=20):
"""
Finds suitable PSF stars. They have to be isolated out to `isodistance`,
no pixels in `imgmsk`, and bright enough according to `magcut`
"""
from scipy.spatial import cKDTree
sx, sy = self.starlocs
spixx = np.round(sx).astype(int)
spixy = np.round(sy).astype(int)
gx, gy = self.gallocs
gpixx = np.round(gx).astype(int)
gpixy = np.round(gy).astype(int)
allocs = np.array([np.concatenate((sx, gx)), np.concatenate((sy, gy))])
#allmags = np.concatenate([self.starmags, self.galmags])
kdt = cKDTree(allocs.T)
dnearest = kdt.query(self.starlocs.T, 2)[0][:, 1]
msk = np.ones(len(self.starmags), dtype=bool)
if imgmsk is not None:
# if the nearest pixel to the second is in `imgmsk`/saturated, don't
# include it
print(imgmsk[spixx, spixy])
msk = msk & ~imgmsk[spixx, spixy]
if isodistance is not None:
msk = msk & (dnearest > isodistance)
msk = msk & (isodistance < spixx) & (spixx < (self.xsize - isodistance))
msk = msk & (isodistance < spixy) & (spixy < (self.ysize - isodistance))
if magcut is not None:
msk = msk & (self.starmags < magcut)
return np.where(msk)[0]
def show_in_ds9(self, img=None, frame=None, ds9=None, markstars=True, markgals=True, inclmags=False):
from pyds9 import DS9
if ds9 is None:
ds9 = DS9()
if img is None:
img = self.fluximg.view(np.ndarray)
else:
img = img.view(np.ndarray)
if img.dtype.kind == 'i':
img = img.astype('int32')
if frame is not None:
ds9.set('frame {0}'.format(frame))
ds9.set_np2arr(img)
if markstars:
if inclmags:
regs = ['text {0} {1} "s,{2:.2f}" # color=red'.format(x+1, y+1, m)
for y, x, m in zip(self.starlocs[0], self.starlocs[1], self.starmags)]
else:
regs = ['point {0} {1} # point=x color=red'.format(x+1, y+1) for y, x in zip(*self.starlocs)]
ds9.set('regions', '\n'.join(regs))
if markgals:
if inclmags:
regs = ['text {0} {1} "g,{2:.2f}" # color=red'.format(x+1, y+1, m)
for y, x, m in zip(self.gallocs[0], self.gallocs[1], self.galmags)]
else:
regs = ['point {0} {1} # point=diamond color=red'.format(x+1, y+1) for y, x in zip(*self.gallocs)]
ds9.set('regions', '\n'.join(regs))
return ds9
class GalaxyModel(object):
"""
sersic model w/ rotation. total flux is 1
"""
def __init__(self, cen_x=0, cen_y=0, re_x=1, re_y=1, pa=0*u.deg, n=1):
self.cen_x = cen_x
self.cen_y = cen_y
self.re_x = re_x
self.re_y = re_y
self.pa = pa
self.n = n
def __call__(self, x, y):
from scipy.special import gamma
cpa = np.cos(self.pa).value
spa = np.sin(self.pa).value
dx = x - self.cen_x
dy = y - self.cen_y
x = dx*cpa + dy*spa
y = -dx*spa + dy*cpa
r = np.hypot(x / self.re_x, y / self.re_y)
bn = 1.9992*self.n - 0.3271
twon = 2 * self.n
ltot = self.re_x * self.re_y * 2 * np.pi * self.n * np.exp(bn) * (bn**-twon) * gamma(twon)
return np.exp(-bn*(r**(1/self.n)-1)) / ltot
class PSFModel(Fittable2DModel):
__metaclass__ = abc.ABCMeta
def fit_data(self, imgdata, xy=None, fittertype=fitting.LevMarLSQFitter, **kwargs):
"""
Fit the model to provided image data.
Note that this ignores bounds when using `LevMarLSQFitter` because
it seems to be broken.
Parameters
----------
imdata : 2D array
The image data to fit this model to
xy : 2-tuple of arrays or None
An (x, y) tuple of the pixel grid (each should be 2D), or None to
use a default 0-indexed grid.
Returns
-------
mod : same as self
A new model with the best-fit parameters. The fitter is available
as `self.last_fitter`
"""
if xy is None:
x, y = np.mgrid[:imgdata.shape[0], :imgdata.shape[1]]
else:
x, y = xy
f = fittertype()
if fittertype == fitting.LevMarLSQFitter:
mod0 = self.copy()
for nm in mod0.bounds:
mod0.bounds[nm] = (None, None)
else:
mod0 = self
res = f(mod0, x, y, imgdata, **kwargs)
self.last_fitter = res.last_fitter = f
#compute the chi-squared
res.chi2 = np.sum((res(x, y) - imgdata)**2)
dofs = x.size - len(res.parameters) + sum(res.fixed.values())
res.chi2r = res.chi2 / dofs
return res
def imshow_model(self, xy=None, n=100, **kwargs):
"""
A convinience method to show the model as a discritized image
"""
kwargs.setdefault('origin', 'lower')
if xy is None:
x, y = np.mgrid[:n, :n]
else:
x, y = xy
img = self(x, y)
kwargs.setdefault('interpolation', 'none')
return plt.imshow(img, **kwargs)
@abc.abstractproperty
def scale_x(self):
"""Some sort of meaningful scale along the x direction"""
raise NotImplementedError
@abc.abstractproperty
def scale_y(self):
"""Some sort of meaningful scale along the y direction"""
raise NotImplementedError
class MoffatPSFModel(PSFModel):
flux = Parameter(default=1)
cen_x = Parameter(default=0)
cen_y = Parameter(default=0)
alpha_x = Parameter(default=1, min=0)
alpha_y = Parameter(default=1, min=0)
theta = Parameter(default=0, min=0, max=45) # in degrees
beta = Parameter(default=4.765, fixed=True)
linear = False
@staticmethod
#def eval(x, y, flux, cen_x, cen_y, alpha_x, alpha_y, theta):
def eval(x, y, flux, cen_x, cen_y, alpha_x, alpha_y, theta, beta):
"""
Normalized such that the integral out to infinity is 1
"""
from math import radians
if theta == 0:
x1 = x - cen_x
y1 = y - cen_y
else:
thetar = radians(theta)
s = np.sin(thetar)
c = np.cos(thetar)
x1 = (x-cen_x) * c - (y-cen_y) * s
y1 = (x-cen_x) * s + (y-cen_y) * c
r = np.hypot(x1/alpha_x, y1/alpha_y)
return flux*(beta-1)*(1 + r**2)**-beta / (np.pi * alpha_x * alpha_y)
@property
def scale_x(self):
return self.alpha_x
@scale_x.setter
def scale_x(self, value):
self.alpha_x = value
@property
def scale_y(self):
return self.alpha_y
@scale_y.setter
def scale_y(self, value):
self.alpha_y = value
class GaussianPSFModel(PSFModel):
flux = Parameter(default=1)
cen_x = Parameter(default=0)
cen_y = Parameter(default=0)
sig = Parameter(default=1, min=0)
linear = False
@staticmethod
def eval(x, y, flux, cen_x, cen_y, sig):
"""
Normalized such that the integral out to infinity is 1
"""
from math import radians
r = np.hypot(x - cen_x, y - cen_y)/sig
return flux*np.exp(-0.5 * r**2)/(sig**2)/TWOPI
@property
def scale_x(self):
return self.sig_x
@scale_x.setter
def scale_x(self, value):
self.sig_x = value
@property
def scale_y(self):
return self.sig_y
@scale_y.setter
def scale_y(self, value):
self.sig_y = value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment