Skip to content

Instantly share code, notes, and snippets.

@d3v-null
Last active August 2, 2022 09:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d3v-null/7552a6caac9cb8a87fb3f25dde406045 to your computer and use it in GitHub Desktop.
Save d3v-null/7552a6caac9cb8a87fb3f25dde406045 to your computer and use it in GitHub Desktop.
Fast UVFits reading code
import numpy as np
from astropy.io import fits
def make_fits_axis_array(hdu, axis):
count = hdu.header[f"NAXIS{axis}"]
crval = hdu.header[f"CRVAL{axis}"]
cdelt = hdu.header[f"CDELT{axis}"]
crpix = hdu.header[f"CRPIX{axis}"]
return cdelt * (np.arange(count) - crpix) + crval
class UVFits:
def __init__(self, uvfits_path: str):
self.uvfits_path = uvfits_path
with fits.open(self.uvfits_path) as hdus:
vis_hdu = hdus['PRIMARY']
# the uvfits baseline of each row in the timestep-baseline axis
self._uvfits_baseline_array = np.int64(vis_hdu.data["BASELINE"])
self.Nblts = len(self._uvfits_baseline_array)
assert self.Nblts == vis_hdu.header["GCOUNT"]
self.unique_baselines = np.sort(np.unique(self._uvfits_baseline_array))
self.Nbls = len(self.unique_baselines)
self._uvfits_time_array = np.float64(vis_hdu.data["DATE"])
self.unique_times = np.sort(np.unique(self._uvfits_time_array))
self.Ntimes = len(self.unique_times)
self.ant_2_array = self._uvfits_baseline_array % 256 - 1
self.ant_1_array = (self._uvfits_baseline_array
- self.ant_2_array) // 256 - 1
self.ant_pair_array = np.stack((self.ant_1_array, self.ant_2_array), axis=1)
self.unique_antpairs = np.sort(np.unique(self.ant_pair_array, axis=0))
assert len(self.unique_antpairs) == self.Nbls
self.freq_array = make_fits_axis_array(vis_hdu, 4)
self.Nfreqs = len(self.freq_array)
self.polarization_array = np.int32(make_fits_axis_array(vis_hdu, 3))
self.Npols = len(self.polarization_array)
self.uvw_array = np.array(np.stack((
vis_hdu.data['UU'],
vis_hdu.data['VV'],
vis_hdu.data['WW'],
)).T)
ant_hdu = hdus['AIPS AN']
self.ant_names = ant_hdu.data["ANNAME"]
self.antenna_numbers = ant_hdu.data.field("NOSTA") - 1
def debug(self):
print(
f"Ntimes={self.Ntimes}, Nbls={self.Nblts}, Nfreqs={self.Nfreqs}, Npols={self.Npols}")
def auto_antpairs(self):
return [(ap[0], ap[1]) for ap in self.unique_antpairs if ap[0] == ap[1]]
def blt_idxs_for_antpair(self, pair):
"""
return the indices into the baseline-time axis corresponding to the given antpair
"""
(ant1, ant2) = pair
return np.where(np.logical_and(
self.ant_1_array == ant1,
self.ant_2_array == ant2,
))[0]
def _data_for_antpairs(self, vis_hdu, pairs):
"""
dimensions: [time, bl, freq, pol]
"""
Npairs = len(pairs)
# sorted to traverse in the order on disk to minimize seeks
blt_idxs = np.sort(np.concatenate([
self.blt_idxs_for_antpair(pair) for pair in pairs
]))
reals = vis_hdu.data.data[blt_idxs, 0, 0, :, :, 0].reshape(
(self.Ntimes, Npairs, self.Nfreqs, self.Npols))
imags = vis_hdu.data.data[blt_idxs, 0, 0, :, :, 1].reshape(
(self.Ntimes, Npairs, self.Nfreqs, self.Npols))
return reals + 1j * imags
def data_for_antpairs(self, pairs):
"""
dimensions: [time, bl, freq, pol]
"""
with fits.open(self.uvfits_path) as hdus:
vis_hdu = hdus['PRIMARY']
result = self._data_for_antpairs(vis_hdu, pairs)
return result[:, 0, :, :]
def data_for_antpair(self, pair):
"""
dimensions: [time, freq, pol]
"""
with fits.open(self.uvfits_path) as hdus:
vis_hdu = hdus['PRIMARY']
result = self._data_for_antpairs(vis_hdu, [pair])
return result[:, 0, :, :]
def test(uvfits_path):
uvfits = UVFits(uvfits_path)
uvfits.debug()
auto_antpairs = uvfits.auto_antpairs()
print(auto_antpairs)
first_auto = uvfits.data_for_antpair(auto_antpairs[0])
print(first_auto.shape)
print(first_auto)
all_autos = uvfits.data_for_antpairs(auto_antpairs)
print(all_autos.shape)
print(all_autos)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment