Last active
August 2, 2022 09:31
-
-
Save d3v-null/7552a6caac9cb8a87fb3f25dde406045 to your computer and use it in GitHub Desktop.
Fast UVFits reading code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
from astropy.io import fits | |
def make_fits_axis_array(hdu, axis): | |
count = hdu.header[f"NAXIS{axis}"] | |
crval = hdu.header[f"CRVAL{axis}"] | |
cdelt = hdu.header[f"CDELT{axis}"] | |
crpix = hdu.header[f"CRPIX{axis}"] | |
return cdelt * (np.arange(count) - crpix) + crval | |
class UVFits: | |
def __init__(self, uvfits_path: str): | |
self.uvfits_path = uvfits_path | |
with fits.open(self.uvfits_path) as hdus: | |
vis_hdu = hdus['PRIMARY'] | |
# the uvfits baseline of each row in the timestep-baseline axis | |
self._uvfits_baseline_array = np.int64(vis_hdu.data["BASELINE"]) | |
self.Nblts = len(self._uvfits_baseline_array) | |
assert self.Nblts == vis_hdu.header["GCOUNT"] | |
self.unique_baselines = np.sort(np.unique(self._uvfits_baseline_array)) | |
self.Nbls = len(self.unique_baselines) | |
self._uvfits_time_array = np.float64(vis_hdu.data["DATE"]) | |
self.unique_times = np.sort(np.unique(self._uvfits_time_array)) | |
self.Ntimes = len(self.unique_times) | |
self.ant_2_array = self._uvfits_baseline_array % 256 - 1 | |
self.ant_1_array = (self._uvfits_baseline_array | |
- self.ant_2_array) // 256 - 1 | |
self.ant_pair_array = np.stack((self.ant_1_array, self.ant_2_array), axis=1) | |
self.unique_antpairs = np.sort(np.unique(self.ant_pair_array, axis=0)) | |
assert len(self.unique_antpairs) == self.Nbls | |
self.freq_array = make_fits_axis_array(vis_hdu, 4) | |
self.Nfreqs = len(self.freq_array) | |
self.polarization_array = np.int32(make_fits_axis_array(vis_hdu, 3)) | |
self.Npols = len(self.polarization_array) | |
self.uvw_array = np.array(np.stack(( | |
vis_hdu.data['UU'], | |
vis_hdu.data['VV'], | |
vis_hdu.data['WW'], | |
)).T) | |
ant_hdu = hdus['AIPS AN'] | |
self.ant_names = ant_hdu.data["ANNAME"] | |
self.antenna_numbers = ant_hdu.data.field("NOSTA") - 1 | |
def debug(self): | |
print( | |
f"Ntimes={self.Ntimes}, Nbls={self.Nblts}, Nfreqs={self.Nfreqs}, Npols={self.Npols}") | |
def auto_antpairs(self): | |
return [(ap[0], ap[1]) for ap in self.unique_antpairs if ap[0] == ap[1]] | |
def blt_idxs_for_antpair(self, pair): | |
""" | |
return the indices into the baseline-time axis corresponding to the given antpair | |
""" | |
(ant1, ant2) = pair | |
return np.where(np.logical_and( | |
self.ant_1_array == ant1, | |
self.ant_2_array == ant2, | |
))[0] | |
def _data_for_antpairs(self, vis_hdu, pairs): | |
""" | |
dimensions: [time, bl, freq, pol] | |
""" | |
Npairs = len(pairs) | |
# sorted to traverse in the order on disk to minimize seeks | |
blt_idxs = np.sort(np.concatenate([ | |
self.blt_idxs_for_antpair(pair) for pair in pairs | |
])) | |
reals = vis_hdu.data.data[blt_idxs, 0, 0, :, :, 0].reshape( | |
(self.Ntimes, Npairs, self.Nfreqs, self.Npols)) | |
imags = vis_hdu.data.data[blt_idxs, 0, 0, :, :, 1].reshape( | |
(self.Ntimes, Npairs, self.Nfreqs, self.Npols)) | |
return reals + 1j * imags | |
def data_for_antpairs(self, pairs): | |
""" | |
dimensions: [time, bl, freq, pol] | |
""" | |
with fits.open(self.uvfits_path) as hdus: | |
vis_hdu = hdus['PRIMARY'] | |
result = self._data_for_antpairs(vis_hdu, pairs) | |
return result[:, 0, :, :] | |
def data_for_antpair(self, pair): | |
""" | |
dimensions: [time, freq, pol] | |
""" | |
with fits.open(self.uvfits_path) as hdus: | |
vis_hdu = hdus['PRIMARY'] | |
result = self._data_for_antpairs(vis_hdu, [pair]) | |
return result[:, 0, :, :] | |
def test(uvfits_path): | |
uvfits = UVFits(uvfits_path) | |
uvfits.debug() | |
auto_antpairs = uvfits.auto_antpairs() | |
print(auto_antpairs) | |
first_auto = uvfits.data_for_antpair(auto_antpairs[0]) | |
print(first_auto.shape) | |
print(first_auto) | |
all_autos = uvfits.data_for_antpairs(auto_antpairs) | |
print(all_autos.shape) | |
print(all_autos) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment