Code used for a MeerKLASS memo on the importance of OTFM PB correction and the amplitude errors introduced
"""Simple code to re-create OTFM beam smearing plots. For the default plots, and
mathematical background see:
Also, the effect is computed and results are re-computed for MeerKAT. Both for
constant RA and constant elevation (MeerKLASS)
The code is based on a circular GAussian beam input for now.
I try to vectorise things rather than parallelize and hope the underlying C code
is fast enough for these things
MeerKLASS general parameters:
Author: Kristof Rozgonyi
License: MIT
#=== Imports ===
import os, sys
import numpy as np
from scipy.special import erf
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.offsetbox import AnchoredText
from matplotlib.patheffects import withStroke
from astropy.coordinates import Longitude, Latitude, EarthLocation
from astropy.coordinates import SkyCoord, AltAz
from astropy.time import Time
from astropy import units as u
from astropy.coordinates import AltAz
#=== Globals ===
#RCparams for plotting
matplotlib.rcParams['xtick.direction'] = 'in'
matplotlib.rcParams['ytick.direction'] = 'in'
matplotlib.rcParams['xtick.major.size'] = 9
matplotlib.rcParams['ytick.major.size'] = 9
matplotlib.rcParams['xtick.major.width'] = 3
matplotlib.rcParams['ytick.major.width'] = 3
matplotlib.rcParams['xtick.minor.size'] = 6
matplotlib.rcParams['ytick.minor.size'] = 6
matplotlib.rcParams['xtick.minor.width'] = 2
matplotlib.rcParams['ytick.minor.width'] = 2
matplotlib.rcParams['axes.linewidth'] = 2
#4 sampled colors from viridis
c0 = '#440154';#Purple
c1 = '#30678D';#Blue
c2 = '#35B778';#Greenish
c3 = '#FDE724';#Yellow
# === Telescope related
# The wideband coarse (4k)
N_CHAN = 410 #I average every 10 channels together not to simulate all 4096chans
L_band_PIXELSIZE = 12 # Arcseconds
L_BAND_CW = 0.00209 # 209kHz channelwidth (208.984 kHz)
UHF_band_PIXELSIZE = 14 # Arcseconds
UHF_BAND_CW = 0.000133 # 132.812 kHz
#=== Functions ===
def deg_to_rad(deg):
return deg * (np.pi / 180)
def arcsecond_to_arcmin(arcsec):
return float(arcsec) / 60.
def arcmin_to_deg(arcmin):
return float(arcmin) / 60.
def arcsecond_to_deg(arcsec):
return float(arcsec) / 360.
def FWHM_to_std(FWHM):
"""Convert Gaussian FWHM to standard deviation
return FWHM / 2.355
def std_to_FWHM(std):
"""Convert Gaussian FWHM to standard deviation
return std * 2.355
def get_pixel_coordinate_array(N,dn):
"""Quick subroutine to compute the grid cell midpoints around zero (one point
is centered at zero) with N points with dn width. If N is even more negative
pints will be computed as this is preffered fro numpy FFT
return np.linspace(-dn*(np.floor(N/2)),dn*np.ceil((N/2-1)),N,endpoint=True)
def get_VLA_PB_FWHM_from_freq(nu):
"""Compute the VLA PB FWHM based on Pereley VLA memo 195 and the
Mooley et al. 2018 paper
nu: float
The given frequency in GHz
The PB FWHM in arcseconds
return (42 * 60) / nu
def get_MeerKAT_PB_FWHM_from_freq(nu):
"""Compute the MeerKAT PB FWHM based on eq. 4 and 5 from Mauch et al. 2019
(deep2 paper). I use the Horizontal PB model scaling
nu: float
The given frequency in GHz
The PB FWHM in arcseconds
return (86.25 * 60) / nu #i.e. (57.5 * 1.5) / nu
def circular_gaussian_beam_value(x, y, sigma=0.5):
"""A simple 2D Gaussian beam value computed on a given image point
x: <numpy.ndarray>
The x (RA) directional coordinates relative to the pointing centre
y: <numpy.ndarray>
The y (Dec) directional coordinates relative to the pointing centre
sigma: float, optional
The standard deviation of the circular Gaussian
z: float
The primary beam value at [x,y]
if np.shape(x) != np.shape(y):
raise ValueError('x and y have different shapes!')
z = np.exp(-(np.power(x,2) + np.power(y,2)) / (2 * np.power(sigma,2)))
return z
def RA_smeared_gaussian_beam_value(x, y, dx, sigma=0.5):
"""Compute the smeared beam (eq. 10) from
x: <numpy.ndarray>
The x (RA) directional coordinates relative to the pointing centre
y: <numpy.ndarray>
The y (Dec) directional coordinates relative to the pointing centre
dx: float
The max RA (x) coordinate offset of the beam for a given phase centre
sigma: float, optional
The standard deviation of the circular Gaussian
z: float
The primary beam value at [x,y]
if np.shape(x) != np.shape(y):
raise ValueError('x and y have different shapes!')
if dx == 0:
return circular_gaussian_beam_value(x, y, sigma=sigma)
b0 = np.exp(-(np.power(x,2) + np.power(y,2)) / (2 * np.power(sigma,2)))
A = (sigma / dx) * b0 * np.exp(np.power(x,2) / (2 * np.power(sigma,2))) \
* np.sqrt(np.pi / 2)
z = A * (erf((x + (dx / 2)) / (np.sqrt(2) * sigma)) \
- erf((x - (dx / 2)) / (np.sqrt(2) * sigma)))
return z
def general_smeared_gaussian_beam_value(x, y, dx, dy, sigma=0.5):
"""Compute the general smeared beam computed using a linear approximation
See the memo for the formula and derivation.
x: <numpy.ndarray>
The x (RA) directional coordinates relative to the pointing centre
y: <numpy.ndarray>
The y (Dec) directional coordinates relative to the pointing centre
dx: float
The max RA (x) coordinate offset of the beam for a given phase centre
in arcseconds
dx: float
The max Dec (y) coordinate offset of the beam for a given phase centre
in arcseconds
sigma: float, optional
The standard deviation of the circular Gaussian
z: float
The primary beam value at [x,y]
if np.shape(x) != np.shape(y):
raise ValueError('x and y have different shapes!')
if dx == 0 and dy == 0:
return circular_gaussian_beam_value(x, y, sigma=sigma)
ds = np.sqrt(np.power(dx,2) + np.power(dy,2))
A = (sigma / ds) * np.sqrt(np.pi / 2)
s_exp = np.exp(-(np.power(((x * (dy / ds)) + (y * (dx / ds))),2)) \
/ (2 * np.power(sigma,2)))
s_smear = (erf(((x * (dx / ds)) - (y * (dy / ds)) + (ds / 2)) \
/ (np.sqrt(2) * sigma)) \
- erf(((x * (dx / ds)) - (y * (dy / ds)) - (ds / 2)) \
/ (np.sqrt(2) * sigma)))
z = A * s_exp * s_smear
return z
def compute_beam_image(N, dn,
"""Generate a regular image of NxN with dn pixel size and generate the given
beam model.
The beam values are computed at the pixel centre points
N: int
The size of the sky image
dn: int
The image pixel size
beam_model: str, optional
Name the primary beam model from the following list:
['circular_gaussian', 'RA_smeared_gaussian', 'general_smeared_gaussian']
sigma: float, optional
The standard deviation of the circular Gaussian
dx: float, optional
The max RA (x) coordinate offset of the beam for a given phase centre (scan)
dy: float, optional
The max RA (x) coordinate offset of the beam for a given phase centre (scan)
x: <numpy.ndarray>
The pixel x coordinates (RA) as a grid (NxN matrix with constant columns)
y: <numpy.ndarray>
The pixel y coordinates(Dec) as a grid (NxN matrix with constant rows)
z: <numpy.ndarray>
The primary beam values as a grid (NxN matrix)
#This is a dirty type solution
N = int(np.ceil(N))
#Extra end at negative values => This is good for numpy FFT
x = get_pixel_coordinate_array(N,dn)
y = get_pixel_coordinate_array(N,dn)
x, y = np.meshgrid(x,y)
if beam_model == 'circular_gaussian':
z = circular_gaussian_beam_value(x,y,sigma=sigma)
elif beam_model == 'RA_smeared_gaussian':
if dx == None:
raise ValueError('no dx is defined!')
z = RA_smeared_gaussian_beam_value(x,y,sigma=sigma,dx=dx)
elif beam_model == 'general_smeared_beam':
if dx == None:
raise ValueError('no dx is defined!')
if dy == None:
raise ValueError('no dy is defined!')
z = general_smeared_gaussian_beam_value(x,y,sigma=sigma,dx=dx,dy=dy)
raise NotImplementedError('Beam model is wrong or not implemented yet')
# z = circular_gaussian_beam_value(x,y,sigma=sigma)
# z = z - RA_smeared_gaussian_beam_value(x,y,sigma=sigma,dx=dx)
return x, y, z
def get_beam_slice(x, y, z, slice_dir='x', slice_at=0.):
"""Get a slice from a PB image at constant RA or Dec
x: <numpy.ndarray>
The pixel x coordinates (RA) as a grid (NxN matrix with constant columns)
y: <numpy.ndarray>
The pixel y coordinates(Dec) as a grid (NxN matrix with constant rows)
z: <numpy.ndarray>
The primary beam values as a grid (NxN matrix)
slice_dir: string, optional
Slice direction i.e. the direction which the slice is returned
ca be 'RA', 'x' or 'Dec', 'y'
e.g if slice_dir = x a slice at constant y is returned
slice_at: float, optional
The constant RA or Dec value in which the slice is taken
xy_slice: <numpy.ndarray>
The slice pixel (x or y) coordinates (1D array)
z_slice: <numpy.ndarray>
The beam values at the slice (1D array)
#I skip checking size matches
if slice_dir == 'RA' or slice_dir == 'x':
slice_dir_array = x[0,:]
perpendicular_dir_array = y
elif slice_dir == 'Dec' or slice_dir == 'y':
slice_dir_array = y[:,0]
perpendicular_dir_array = x
raise ValueError('Invalid slice direction is given!')
#Get the closest value in array to the given slice at
#I use the fact that the bem is computed on an NxN regular image
slice_at = slice_dir_array[np.argmin(np.abs(
slice_dir_array - slice_at))]
xy_slice = np.copy(slice_dir_array)
z_slice = np.copy(z[perpendicular_dir_array == slice_at])
return xy_slice, z_slice
def fractional_PB_cange_of_single_slice(pixelsize=1,
Ndx = 100,
fdx = 0.01,
beam_model = 'RA_smeared_gaussian'
"""Generates fig 2 data matrix from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
pixelsize: float, opt
The image pixelsize in arcseconds
PB_FWHM: float, opt
The Primary Beam FWHM in arcseconds. Used to calculate the std of the PB
Ndx: int, optional
The number of dx coordinate offset values to test
fdx: float, optional
The fractional change of the x coordinate for a single subscan (one phase
centre) from 0 to Ndx * fdx *PB_FWHM
slice_dir: string, optional
Slice direction i.e. the direction which the slice is returned
ca be 'RA', 'x' or 'Dec', 'y'
e.g if slice_dir = x a slice at constant y is returned
slice_at: float, optional
The constant RA or Dec value in which the slice is taken
N_image: int, optional
If None the image size is 2 * PB_FWHM, otherwise this parameter controls
the image sixe (x axis of the plot)
beam_model: str, optional
Name the (smeared) primary beam model from the following list:
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_dx matrix with
constant columns)
dx_array: <numpy.ndarray>
The generated dx value coordinates as a grid (N_image x N_dx matrix with
constant rows)
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_dx matrix)
#Set up the parameters
PB_sigma = FWHM_to_std(PB_FWHM)
if N_image == None:
N_image = np.ceil((PB_FWHM * 2) / pixelsize)
N_image = int(N_image)
ddx = fdx * PB_FWHM
#Get the output arrays
dx_array = np.arange(ddx,(Ndx * ddx) + 1,ddx)
dPB_array = get_pixel_coordinate_array(N_image,pixelsize)
PB_err = np.zeros((Ndx, N_image))
#Get the reference beam
x0, y0, z0 = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = 'circular_gaussian',
sigma = PB_sigma)
ref_xy_slice, ref_z_slice = get_beam_slice(x0,y0,z0,
slice_dir=slice_dir, slice_at=slice_at)
#Fill up the PB_err matrix
for i in range(0,Ndx):
x, y, z = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = beam_model,
sigma = PB_sigma,
dx = dx_array[i])
xy_slice, z_slice = get_beam_slice(x,y,z,
slice_dir=slice_dir, slice_at=slice_at)
PB_err[i,:] = np.divide(z_slice,ref_z_slice)
#Compute the meshgrid coordinates
dPB_array, dx_array = np.meshgrid(dPB_array,dx_array)
del x0, y0, z0, x, y, z, ref_xy_slice, ref_z_slice, xy_slice, z_slice
return dPB_array, dx_array, PB_err
def fractional_PB_cange_with_frequency_of_single_slice(pixelsize=1,
N_image = (14 * 60 * 1.6) + 1,
nu_central = 3,
Nnu = 201,
dnu = 0.01,
PB_scaling_model = 'VLA',
dx = 240,
slice_dir = 'x',
slice_at = 0.,
beam_model = 'RA_smeared_gaussian'
"""Generates fig 2 data matrix from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
pixelsize: float, opt
The image pixelsize in arcseconds
N_image: int, optional
The image size
nu_central: float, opt
The central frequency in GHz
Nnu: int, optional
The number of frequency channels to test
dnu: float, opt
The width of a single frequency channel in GHz
PB_scaling_model: str, opt
The model of how the PB FWHM depends on the observing frequency. Can be
any from the following list: ['VLA', 'MeerKAT']
dx: float, optional
The absoolute change of the x coordinate for a single subscan used for
all frequencies (in arcseconds)
slice_dir: string, optional
Slice direction i.e. the direction which the slice is returned
ca be 'RA', 'x' or 'Dec', 'y'
e.g if slice_dir = x a slice at constant y is returned
slice_at: float, optional
The constant RA or Dec value in which the slice is taken
beam_model: str, optional
Name the (smeared) primary beam model from the following list:
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_dx matrix with
constant columns)
dx_array: <numpy.ndarray>
The generated dx value coordinates as a grid (N_image x N_dx matrix with
constant rows)
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_dx matrix)
N_image = int(np.ceil(N_image))
#Get the output arrays
nu_array = np.add(get_pixel_coordinate_array(Nnu,dnu), nu_central)
dPB_array = get_pixel_coordinate_array(N_image,pixelsize)
PB_err = np.zeros((Nnu, N_image))
#Get the frequency deendent PB FWHM array
PB_FWHM_array = np.zeros(Nnu)
if PB_scaling_model == 'VLA':
for i in range(0,Nnu):
PB_FWHM_array[i] = get_VLA_PB_FWHM_from_freq(nu_array[i])
elif PB_scaling_model == 'MeerKAT':
for i in range(0,Nnu):
PB_FWHM_array[i] = get_MeerKAT_PB_FWHM_from_freq(nu_array[i])
#Fill up the PB_err matrix
for i in range(0,Nnu):
#Get the reference beam
x0, y0, z0 = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = 'circular_gaussian',
sigma = FWHM_to_std(PB_FWHM_array[i]))
ref_xy_slice, ref_z_slice = get_beam_slice(x0,y0,z0,
slice_dir=slice_dir, slice_at=slice_at)
#Compute the smeared beam
x, y, z = compute_beam_image(N = N_image,
dn = pixelsize,
beam_model = beam_model,
sigma = FWHM_to_std(PB_FWHM_array[i]),
dx = dx)
xy_slice, z_slice = get_beam_slice(x,y,z,
slice_dir=slice_dir, slice_at=slice_at)
PB_err[i,:] = np.divide(z_slice,ref_z_slice)
#Compute the meshgrid coordinates
dPB_array, nu_array = np.meshgrid(dPB_array,nu_array)
del x0, y0, z0, x, y, z, ref_xy_slice, ref_z_slice, xy_slice, z_slice
return dPB_array, nu_array, PB_err
def get_smearing_from_altaz_subscan(t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = 2.,
telescope = 'MeerKAT',
"""Compute the smeared beam from the altaz parameters pelescope pointing
position and sub-scan integration time
t0: <astropy.Time>, optional
The the observation time in UTC with y-m-dTh:m:s. format at the time
centroid of the sub-scan
alt0: float, optional
The altitude of the observation at the phase centre
az0: float, optional
The azimuth at the observation at the phase centre
slew_velocity: float, optional
The slew velocity in arcseconds (!). Defined in the azimutal direction
slew_direction: int, optional
The direction of slewing. Can be [+1,-1]
dt: float, optional
The sub-scan integration time
telescope: str, optional
The telescope used for the subscan simulations. Possible values are:
separetion_only: bool, optional
If true, onlt the Delta s angular separation is returned
dRA: float
The RA separation for the sub-scan (corrected for Dec) in arcseconds
dDec: float
The Dec separation for the sub-scan in arcseconds
if telescope == 'MeerKAT':
location = EarthLocation.from_geodetic(
Longitude('21:26:38.0',, wrap_angle=180. *, copy=False),
Latitude('-30:42:39.8',, copy=False),
height=u.Quantity(1086.6, u.m, copy=False))
raise NotImplementedError('Only MeerKAT telescope is implemented yet')
#Convert slew velocity to degrees
slew_velocity_deg = arcmin_to_deg(slew_velocity)
dt_half = (dt / 2)
t1 = t0 - (dt_half * u.second)
t2 = t0 + (dt_half * u.second)
az1 = az0 - (slew_direction * np.multiply(slew_velocity_deg, dt_half))
az2 = az0 + (slew_direction * np.multiply(slew_velocity_deg, dt_half))
centre_icrs_coordinates = SkyCoord(alt = alt0 * u.deg,
az = az0 * u.deg,
obstime = t0,
frame = 'altaz',
location = location).transform_to('icrs')
start_icrs_coordinates = SkyCoord(alt = alt0 * u.deg,
az = az1 * u.deg,
obstime = t1,
frame = 'altaz',
location = location).transform_to('icrs')
end_icrs_coordinates = SkyCoord(alt = alt0 * u.deg,
az = az2 * u.deg,
obstime = t2,
frame = 'altaz',
location = location).transform_to('icrs')
dra = (end_icrs_coordinates.ra-start_icrs_coordinates.ra).arcsec \
* np.cos(centre_icrs_coordinates.dec.rad)
dDec = (end_icrs_coordinates.dec-start_icrs_coordinates.dec).arcsec
sep = end_icrs_coordinates.separation(start_icrs_coordinates).arcsec
if separation_only:
return sep
return dra, dDec
#=== Survey and scanning pattern defining functions ===
def get_stripe_coordinate_values_constant_az_slew(t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = 2.,
T_stripe = 216.,
t0_rel = 0.,
telescope = 'MeerKAT',
frame = 'icrs'):
"""This function returns a relative time array and the corresponding RA and
Dec arrays for a given sstripe.
The stripe slew is performed with a constant altitude (elevation) pointing and so
slewing only in the azimuthal direction.
t0: <astropy.Time>, optional
The start of the observation time in UTC with y-m-dTh:m:s. format
alt0: float, optional
The altitude of the observation at t0
az0: float, optional
The azimuth at the observation at t0
slew_velocity: float, optional
The slew velocity in arcseconds (!). Defined in the azimutal direction
slew_direction: int, optional
The direction of slewing. Can be [+1,-1]
dt: float, optional
The visibility sampling time in seconds
T_stripe: float, optional
The time which under a single stripe is performed
t0_rel: float, optional
The relative start time of the observation (used for several stripe)
telescope: str, optional
The telescope used for the stripe simulations. Possible values are:
frame: stra, optional
Available frames: ['icrs','altaz']
t_stripe: <numpy.ndarray>
The time values of the stripe relative to the observation's starting
time in seconds.
x_stripe: <numpy.ndarray>
The RA (icrs) or Alt (altaz) values of the stripe in degrees
y_stripe: <numpy.ndarray>
The Dec (icrs) or Az (altaz) values of the stripe in degrees
ti: <astropy.Time>
The last time sample's UTC time stamp (to be used as an input for the next
azi: float
The last ime sample's azimuth in degrees (to be used as an input for
the next stripe)
if telescope == 'MeerKAT':
location = EarthLocation.from_geodetic(
Longitude('21:26:38.0',, wrap_angle=180. *, copy=False),
Latitude('-30:42:39.8',, copy=False),
height=u.Quantity(1086.6, u.m, copy=False))
raise NotImplementedError('Only MeerKAT telescope is implemented yet')
#Convert slew velocity to degrees
slew_velocity_deg = arcmin_to_deg(slew_velocity)
#Get the relative time array
t_stripe = np.arange(t0_rel,T_stripe,dt)
#Get alt az vectors
alt_stipe = alt0 * np.ones(np.size(t_stripe))
az_stipe = az0 + (slew_direction * np.multiply(slew_velocity_deg, t_stripe))
t_obs_stripe = t0 + t_stripe * u.second
#Convert to RA Dec
stripe_altaz_coordinates = SkyCoord(alt = alt_stipe * u.deg,
az = az_stipe * u.deg,
obstime = t_obs_stripe,
frame = 'altaz',
location = location)
if frame == 'altaz':
x_stripe = stripe_altaz_coordinates.alt.deg
y_stripe =
elif frame == 'icrs':
stripe_icrs_coordinates = stripe_altaz_coordinates.transform_to('icrs')
x_stripe = stripe_icrs_coordinates.ra.deg
y_stripe = stripe_icrs_coordinates.dec.deg
raise NotImplementedError('Only icrs and altaz frames are implemented')
ti = t_obs_stripe[-1]
#alti = stripe_altaz_coordinates[-1].alt.deg
azi = stripe_altaz_coordinates[-1].az.deg
return t_stripe, x_stripe, y_stripe, ti, azi
def get_field_scan_coordinate_values_constant_az_slew(N_scan = 26,
t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = 2.,
T_stripe = 216.,
t0_rel = 0.,
telescope = 'MeerKAT',
frame = 'icrs'):
"""Create the full scan of the target field, defined via a starting stripe
and the number of consequent stripe.
N_scan: int, optional,
The number of stripes used to cover the field simulated
t0: <astropy.Time>, optional
The start of the observation time in UTC with y-m-dTh:m:s. format
alt0: float, optional
The altitude of the observation at t0
az0: float, optional
The azimuth at the observation at t0
slew_velocity: float, optional
The slew velocity in arcseconds (!). Defined in the azimutal direction
slew_direction: int, optional
The direction of slewing. Can be [+1,-1]
dt: float, optional
The visibility sampling time in seconds
T_stripe: float, optional
The time which under a single stripe is performed
t0_rel: float, optional
The relative start time of the observation (used for several stripes)
telescope: str, optional
The telescope used for the stripe simulations. Possible values are:
frame: stra, optional
Available frames: ['icrs','altaz']
t_field: <numpy.ndarray>
The time values of the full field scans relative to the observation's
starting time in seconds.
x_field: <numpy.ndarray>
The RA (icrs) or Alt (altaz) values of the full field scans in degrees
y_field: <numpy.ndarray>
The Dec (icrs) or Az (altaz) values of the full field scans in degrees
#Define the output arrays
t_field = np.arange(t0_rel,T_stripe * N_scan,dt)
x_field = np.zeros(np.size(t_field))
y_field = np.zeros(np.size(t_field))
#Loop through the stripes
ti = t0
azi = az0
for i in range(0,N_scan):
t_stripe, x_stripe, y_stripe, ti, azi = \
t0 = ti,
alt0 = alt0,
az0 = azi,
slew_velocity = slew_velocity,
slew_direction = slew_direction,
dt = dt,
T_stripe = T_stripe,
t0_rel = t0_rel,
telescope = telescope,
frame = frame)
#ti_rel += T_stripe * dt
slew_direction *= -1
stripe_size = np.size(t_stripe) #This is ugly but I am mad atm for somethiong else so I dont care...
counter = i * stripe_size
t_field[counter:counter + stripe_size] = t_stripe + (i * T_stripe)
x_field[counter:counter + stripe_size] = x_stripe
y_field[counter:counter + stripe_size] = y_stripe
return t_field, x_field, y_field
#=== Plotting functions ===
def add_inner_title(ax, title, loc, prop=None, white_border=True, **kwargs):
"""Create a fancy inner title for plots
ax: `matplotlib.axex`
The axis which the inner title will be created
title: str
The title created
loc: int
Location. Can be verbose as well I think...
prop: dict, optional
A dictionary containing the properties of the title
eg. dict(size=18, color='green')
white_border: bool, optional
If True, no white borders are drawn around the text
at: artist
The inner title artist that is added to the plot
if prop is None:
prop = dict(size=plt.rcParams['legend.fontsize'])
at = AnchoredText(title, loc=loc, prop=prop,
pad=0., borderpad=0.75,
frameon=False, **kwargs)
if white_border:
at.txt._text.set_path_effects([withStroke(foreground="w", linewidth=3)])
return at
def plot_beam(x,y,z,ptitle=None):
"""Create a simple beam plot
x, y, z: <numpy.ndarray>
Output positional and beam value vectors from `compute_beam_image`
ptitle: str
The title of the plot
Create an image
fig = plt.figure(1, figsize=(12,12))
if ptitle != None:
fig.suptitle('Primary beam model: {0:s}'.format(ptitle), fontsize = 18)
beam = plt.pcolor(x, y, z, rasterized = True, shading='auto')
cb = plt.colorbar(beam, aspect=30, fraction=0.04975, pad=0)
plt.rcParams['contour.negative_linestyle'] = 'solid''in', length=6, width=2)'B', fontsize = 18)
CS = plt.contour(x, y, z,
levels = [np.amax(z)/2,np.amax(z)*2/3],
colors='w', alpha=1.)
plt.clabel(CS, fontsize=14, inline=True)
plt.xlabel(r'x', fontsize = 18)
plt.ylabel(r'y', fontsize = 18)
def plot_fractional_PB_change(dPB_array, dx_array, PB_err,
PB_FWHM = 60,
PB_normalisation = True,
cont_levels = [],
half_beam = True):
"""Generates fig 2 plot from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
for more info see `fractional_PB_cange_of_single_slice()`
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_dx matrix with
constant columns)
dx_array: <numpy.ndarray>
The generated dx value coordinates as a grid (N_image x N_dx matrix with
constant rows)
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_dx matrix)
fname: str
The full path and name and extension of the output image
ptitle: str, opt
The title of the plot
PB_FWHM: float, opt
The Primary Beam FWHM in arcseconds. Used to calculate the std of the PB
PB_normalisation: bool, opt
If True the plots x and y axis will be normalised by the PB FWHM
cont_levels: list, optional
The contour level values (default values if not given)
half_beam: bool, optional
If True, only half of the beam is being polotted due to symmetry reasons
Create an image
fig = plt.figure(1, figsize=(9,4.5))
if ptitle != None:
plt.title(ptitle, fontsize = 18)
if PB_normalisation:
beam = plt.pcolor(dPB_array / PB_FWHM, dx_array / PB_FWHM, PB_err,
rasterized=True, shading='auto')
beam = plt.pcolor(dPB_array, dx_array, PB_err,
rasterized=True, shading='auto')
cb = plt.colorbar(beam, aspect=30, fraction=0.04975, pad=0)'in', length=6, width=2)'B$_{eff, \nu_0}$/B$_{0, \nu_0}$', fontsize = 18)
if len(cont_levels) == 0:
CS = plt.contour(dPB_array / PB_FWHM, dx_array / PB_FWHM, PB_err,
colors='w', alpha=1.)
CS = plt.contour(dPB_array / PB_FWHM, dx_array / PB_FWHM, PB_err,
levels = cont_levels,
colors='w', alpha=1.)
plt.clabel(CS, fontsize=14, inline=True)
if PB_normalisation:
plt.xlabel(r's$_0$/$\Theta_{B_{0, \nu_0}}$', fontsize = 18)
plt.ylabel(r'$\Delta$s/$\Theta_{B_{0, \nu_0}}$', fontsize = 18)
plt.xlabel(r's$_0$', fontsize = 18)
plt.ylabel(r'$\Delta$s', fontsize = 18)
if half_beam:
plt.savefig(fname, bbox_inches='tight')
def plot_fractional_PB_change_with_frequency(dPB_array, nu_array, PB_err,
PB_FWHM = None,
PB_normalisation = True,
cont_levels = [],
half_beam = True,
inner_title = None):
"""Generates fig 2 plot from Mooley et al. 2018
Currently only working with the RA smeared beam model
TO DO: make this function more general to work with an arbritary PB model
for more info see `fractional_PB_cange_of_single_slice()`
dPB_array: <numpy.ndarray>
The pixel slice coordinates as a grid (N_image x N_nu matrix with
constant columns)
nu_array: <numpy.ndarray>
The generated frequency value coordinates as a grid (N_image x N_nu
matrix with constant rows) given in GHz
PB_err: <numpy.ndarray>
The fractional bema change values as a grid (N_image x N_nu matrix)
fname: str
The full path and name and extension of the output image
ptitle: str, opt
The title of the plot
PB_FWHM_reference: float, opt
The Primary Beam FWHM for a given ref frequency in arcseconds.
PB_normalisation: bool, opt
If True the plots x and y axis will be normalised by the PB FWHM
cont_levels: list, optional
The contour level values (default values if not given)
half_beam: bool, optional
If True, only half of the beam is being polotted due to symmetry reasons
Create an image
#get the PB at the lowest frequency
if PB_FWHM == None:
PB_FWHM = get_VLA_PB_FWHM_from_freq(np.amin(nu_array))
fig = plt.figure(1, figsize=(9,4.5))
if ptitle != None:
plt.title(ptitle, fontsize = 18)
if PB_normalisation:
beam = plt.pcolor(dPB_array / PB_FWHM, nu_array, PB_err,
rasterized=True, shading='auto')
beam = plt.pcolor(dPB_array, nu_array, PB_err,
rasterized=True, shading='auto')
cb = plt.colorbar(beam, aspect=30, fraction=0.04975, pad=0)'in', length=6, width=2)'B$_{eff}$/B$_0$', fontsize = 18)
if len(cont_levels) == 0:
CS = plt.contour(dPB_array / PB_FWHM, nu_array, PB_err,
colors='w', alpha=1.)
CS = plt.contour(dPB_array / PB_FWHM, nu_array, PB_err,
levels = cont_levels,
colors='w', alpha=1.)
plt.clabel(CS, fontsize=14, inline=True)
if PB_normalisation:
plt.xlabel(r's$_0$/$\Theta_{B,\nu_{0}}$', fontsize = 18)
plt.xlabel(r's$_0$', fontsize = 18)
plt.ylabel(r'$\nu$ [GHz]', fontsize = 18)
if half_beam:
if inner_title != None:
ax = plt.gca()
add_inner_title(ax, inner_title, loc=3, prop=dict(size=16))
plt.savefig(fname, bbox_inches='tight')
def plot_field_scan(RA_field,
ptitle = '',
setting = False,
RA_field_setting = None,
Dec_field_setting = None):
"""Plot a scan pattern of a field, both the rising and setting scans if the
latter is provided.
Basicaly creates similar figures as Fig. 3 in Wang et aql. 2021
RA_field: <numpy.ndarray>
The RA values of the full field scans in degrees
Dec_field: <numpy.ndarray>
The Dec values of the full field scans in degrees
fname: str
The full path and name and extension of the output image
ptitle: str, opt
The title of the plot
setting: bool, optional
If True, the setting scanning pattern is also drawn
RA_field_setting: <numpy.ndarray>, optional
The RA values of the full field setting scans in degrees
Dec_field_setting: <numpy.ndarray>, optional
The Dec values of the full field setting scans in degrees
Create an image
if setting:
if np.shape(RA_field) != np.shape(RA_field_setting) or \
np.shape(Dec_field) != np.shape(Dec_field_setting):
raise ValueError("Input scan coordinate lengths are different!")
fig = plt.figure(1, figsize=(9,4.5))
if ptitle != None:
plt.title(ptitle, fontsize = 18)
if setting:
plt.plot(RA_field, Dec_field, '-', c=c0, lw=2., alpha=1., label='rising')
plt.plot(RA_field_setting, Dec_field_setting, '-', c=c2, lw=2., alpha=1.,
legend = plt.legend(loc='upper right', fontsize=18, framealpha=1., edgecolor='black')
plt.plot(RA_field, Dec_field, '-', c=c0, lw=2., alpha=1.)
ax = plt.gca()
plt.ylabel(r'Dec (J2000) [deg]', fontsize = 18)
plt.xlabel(r'RA (J2000) [deg]', fontsize = 18)
plt.savefig(fname, bbox_inches='tight')
def VLA_OTFM_plots():
"""Re-create the Mooley et al. 2018 Figs 2. & 3.
More precisely, fig 3. is not re-created but a similar figure with more
physical mening/relevance is shown. Where the x axis uses a fixed PB
FWHM (the smallest one), as the pointings are frequency dependent.
This is basically a check that our code works as intended
telescope = 'VLA'
pixelsize = 2.5 #This equals the synthesised beam size for now
dx = 4 * 60 #The absolute RA offset within each subscan
nu_central = 3.
N_chan = 201
chan_width = 0.01
#PB_FWHM = 60 * 10 #The PB FWHM in arcseconds at 3 GHz
PB_FWHM = get_VLA_PB_FWHM_from_freq(nu_central)
#Get PB values
nu_array = get_pixel_coordinate_array(N_chan,chan_width) + nu_central
#=== Fractional beam change ===
contours = [0.825,0.85,0.875,0.9,0.925,0.95,0.97,0.98,0.99,0.995,1.,\
dPB_array, dx_array, PB_err = fractional_PB_cange_of_single_slice(
pixelsize = pixelsize,
Ndx = 99, #So it goes up to dx=0.1
fdx = 0.01,
N_image=((PB_FWHM * 1.6) / pixelsize) + 1) #So I plot up to the +/- 0.8
#of the PB FWHM
parameters_string = r'{0:s} OTFM at {1:0.1f} GHz with' \
'$\Theta_B$={2:.1f}\' & $\Theta_s$={3:.1f}\" RA slice'.format(
telescope,nu_central,PB_FWHM / 60,pixelsize)
fname = './figures/{0:s}_fractional_beam_change_RA_slice.pdf'.format(
plot_fractional_PB_change(dPB_array, dx_array, PB_err,
fname = fname,
ptitle = parameters_string,
#=== Frequency dependence ===
contours = [0.97,0.98,0.99,0.995,1.,1.005,1.01,1.02,1.03,1.04,1.06]
#Normalising with max freq
nu_ext = np.amax(nu_array)
nu_ext_PB_FWHM = get_VLA_PB_FWHM_from_freq(nu_ext)
dPB_array, dnu_array, PB_err = fractional_PB_cange_with_frequency_of_single_slice(
pixelsize = pixelsize,
N_image = ((nu_ext_PB_FWHM * 1.6) / pixelsize) + 1,
nu_central = nu_central,
Nnu = N_chan,
dnu = chan_width,
dx = dx,
PB_scaling_model = telescope,
parameters_string = r'{0:s} OTFM with ' \
r'$\Delta$x={1:.1f}'"'"r' & $\nu$$_0$={2:.1f} GHz RA slice'.format(
telescope, dx/60, nu_ext)
fname = './figures/\
plot_fractional_PB_change_with_frequency(dPB_array, dnu_array, PB_err,
fname = fname,
ptitle = parameters_string,
PB_FWHM = nu_ext_PB_FWHM,
cont_levels = contours)
def plot_example_MeerKLASS_field_scans():
"""Create a similar figure for the simulated scans as in Wang et al. 2021
Fig. 3.
t_field_rising, x_field_rising, y_field_rising = \
t_field_setting, x_field_setting, y_field_setting = \
t0 = Time('2019-02-25T00:40:11'),
alt0 = 41.,
plot_field_scan(RA_field = x_field_rising,
Dec_field = y_field_rising,
fname = './figures/example_scan_coverage.pdf',
ptitle = '',
setting = True,
RA_field_setting = x_field_setting,
Dec_field_setting = y_field_setting)
def MeerKLASS_OTFM_plots(band='L'):
"""Create the OTFM smearing plots for MeerKLASS
telescope = 'MeerKAT'
#From the MeerKLass setup i.e Wang et al. 2021
if band == 'L':
pixelsize = L_band_PIXELSIZE
nu_central = L_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = L_BAND_CW
elif band == 'UHF':
pixelsize = UHF_band_PIXELSIZE
nu_central = UHF_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = UHF_BAND_CW
raise ValueError('Invalid band name (S is not supported)!')
#I use the top frequency of the band as discussed in the memo
#PB_FWHM = get_MeerKAT_PB_FWHM_from_freq(nu_central)
#Get PB values
nu_array = get_pixel_coordinate_array(N_chan,chan_width) + nu_central
PB_FWHM = get_MeerKAT_PB_FWHM_from_freq(np.amax(nu_array))
#=== Fractional beam change ===
contours = [0.85,0.9,0.95,0.99,1.,1.01,1.05,1.1,1.2,1.3,1.4,1.5]
dPB_array, dx_array, PB_err = fractional_PB_cange_of_single_slice(
pixelsize = pixelsize,
Ndx = 99, #So it goes up to dx=0.1
fdx = 0.01,
N_image=((PB_FWHM * 1.6) / pixelsize) + 1) #So I plot up to the +/- 0.8
#of the PB FWHM
#parameters_string = r'Fractional PB cange for {0:s} OTFM at {1:0.1f} GHz with' \
# '$\Theta_B$={2:.1f}\' & $\Theta_s$={3:.1f}\" x slice'.format(
# telescope,nu_central,PB_FWHM / 60,pixelsize)
fname = './figures/{0:s}_fractional_beam_change_{1:s}.pdf'.format(
telescope, band)
plot_fractional_PB_change(dPB_array, dx_array, PB_err,
fname = fname,
ptitle = None,
#=== Frequency dependence ===
contours = [0.9,0.925,0.95,0.975,0.99,1.,1.01,1.05,1.1,1.2]
#Normalising with max freq
nu_ext = np.amax(nu_array)
nu_ext_PB_FWHM = get_MeerKAT_PB_FWHM_from_freq(nu_ext)
for dt in [2.,4.,6.,8.,10.,12.]:
#Get ds for the frequncy dependence plots
#Rising field parameters
dRA, dDec = get_smearing_from_altaz_subscan(t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = dt)
ds = np.sqrt(np.power(dRA,2) + np.power(dDec,2))
dPB_array, dnu_array, PB_err = fractional_PB_cange_with_frequency_of_single_slice(
pixelsize = pixelsize,
N_image = ((nu_ext_PB_FWHM * 1.6) / pixelsize) + 1,
nu_central = nu_central,
Nnu = N_chan,
dnu = chan_width,
dx = ds,
PB_scaling_model = telescope,
#parameters_string = r'{0:s} OTFM with ' \
#r'$\Delta$x={1:.1f}'"'"r' & $\nu$$_0$={2:.1f} GHz RA slice'.format(
# telescope, dx/60, nu_ext)
fname = './figures/\
int(dt), band)
plot_fractional_PB_change_with_frequency(dPB_array, dnu_array, PB_err,
fname = fname,
ptitle = '',
PB_FWHM = nu_ext_PB_FWHM,
cont_levels = contours,
inner_title = \
r"$\Delta$T = {0:d}s & $\Delta$s = {1:.2f}".format(
int(dt),ds / nu_ext_PB_FWHM) \
+ r" $\Theta_{B,\nu_{0}}$")
def print_PB_parameters_and_smearing_difference(band='L'):
"""Just print out various useful parameters for the discussion
telescope = 'MeerKAT'
#From the MeerKLass setup i.e Wang et al. 2021
if band == 'L':
pixelsize = L_band_PIXELSIZE
#nu_central = 1.284
nu_central = L_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = L_BAND_CW
elif band == 'UHF':
pixelsize = UHF_band_PIXELSIZE
nu_central = UHF_BAND_NU_CENTRAL
N_chan = N_CHAN
chan_width = UHF_BAND_CW
raise ValueError('Invalid band name (S is not supported)!')
print('Selected observing band: {0:s}'.format(band))
#Get PB values
nu_array = get_pixel_coordinate_array(N_chan,chan_width) + nu_central
print('The PB FWHM at min freq: {0:.2f} arcminutes'.format(
print('The PB FWHM at max freq: {0:.2f} arcminutes'.format(
#Compute the difference for the rising and setting skyes
for dt in [2.,4.,6.,8.,10.,12.]:
#Get ds for the frequncy dependence plots
#Rising field parameters
dRA, dDec = get_smearing_from_altaz_subscan(
t0 = Time('2019-02-24T19:48:28'),
alt0 = 42.,
az0 = 59.6,
slew_velocity = 5.,
slew_direction = -1,
dt = dt)
ds = np.sqrt(np.power(dRA,2) + np.power(dDec,2))
#Setting field parameters
dRA_setting, dDec_setting = get_smearing_from_altaz_subscan(
t0 = Time('2019-02-25T00:40:11'),
alt0 = 41.,
dt = dt)
ds_setting = np.sqrt(np.power(dRA_setting,2) + np.power(dDec_setting,2))
print('For dT={0:d}s the rising and setting smearings are {1:.2f} and \
{2:.2f} arcminutes, with a difference of {3:.2f} arcminutes'.format(
#=== MAIN ===
if __name__ == "__main__":
# Need to have a figures/ subdir
