Last active
December 9, 2022 11:08
-
-
Save astropenguin/4ba0e032cb02cd8549393b2ebb10f8f0 to your computer and use it in GitHub Desktop.
Pipeline and ancillary module for data reduction of NRO45m/ASTE position-switching data (*nqm) with CASA (<= 5.0.0)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
import sdpl | |
import numpy as np | |
import matplotlib.pyplot as plt | |
"""CASA pipeline for NRO45m/SAM45 PSW data. | |
This script uses the ATNF Spectral Analysis Package (ASAP), | |
moduled as `sd` in the Common Astronomy Software Packages (CASA). | |
So user must execute the script on a CASA prompt: | |
CASA <2>: execfile('pipeline.py') | |
Array IDs and corresponding IF numbers are: | |
+ TZ1V: 0, 1 | |
+ TZ1H: 2, 3 | |
+ TZ2V: 4, 5 | |
+ TZ2H: 6, 7 | |
""" | |
# data selection: put paths of nqm files | |
nqmnames = [ | |
"path-to-nqm-01.nqm", | |
"path-to-nqm-02.nqm", | |
"path-to-nqm-03.nqm", | |
] | |
# data selection: select IF numbers to use (see above) | |
ifs = [0, 2, 4, 6] | |
# baseline fit: put number of edge channels to be masked | |
edgech = 500 | |
# baseline fit: put frequency range (GHz) of line emission to be ignored | |
linerange = [110.0, 111.0] | |
# baseline fit: put maximum order of polynomial baseline fit. | |
polyorder = 1 | |
# baseline fit: put S/N threshold of sinusoid baseline fit. | |
fftthresh = "7sigma" # or None if you do not apply it. | |
# data flagging: Maximum acceptable wiggle relative to noise level. | |
wiggle_limit = 1.1 | |
# data flagging: Maximu acceptable wind speed (m/s). | |
wind_limit = 5.0 | |
# spectral maiking: Number of channels to be binned. | |
nbin = 64 | |
# applying baseline fit and flagging to each nqm data | |
list_scan = [] | |
list_tsys = [] | |
list_tint = [] | |
n_used = 0 | |
n_flagged = 0 | |
for nqmname in nqmnames: | |
print("Now loading: {}".format(nqmname)) | |
st = sdpl.load_nqm(nqmname, ifs) | |
mask = sdpl.create_mask(st, edgech, linerange) | |
stbl = sdpl.apply_blfit(st, mask, polyorder, fftthresh) | |
flag = sdpl.create_flag(stbl, mask, wiggle_limit, wind_limit) | |
output = sdpl.apply_flag(stbl, flag) | |
list_scan.append(output["scan"]) | |
list_tsys.append(output["tsys"]) | |
list_tint.append(output["tint"]) | |
n_used += float(output["n_used"]) | |
n_flagged += float(output["n_flagged"]) | |
freq = output["freq"] | |
scan = np.concatenate(list_scan) | |
tsys = np.concatenate(list_tsys) | |
tint = np.concatenate(list_tint) | |
# create integrated spectrum | |
spec = sdpl.create_spectrum(scan, tsys, nbin) | |
noise = sdpl.create_noisespectrum(scan, nbin) | |
# print results | |
rms = np.median(noise) | |
sn_max = np.max(spec / noise) | |
on_src = np.sum(tint) / len(ifs) | |
print("Achieved noise level: {:.2f} mK".format(1e3 * rms)) | |
print("Maximum S/N of line: {:.1f} sigma".format(sn_max)) | |
print("Total on-source time: {:.2f} hr".format(on_src / 3600)) | |
print("Data usage rate: {:.1%}".format(n_used / (n_used + n_flagged))) | |
# plotting | |
plt.figure(figsize=(12, 4)) | |
plt.plot(freq, spec) | |
plt.fill_between(freq, noise, -noise, alpha=0.5) | |
plt.xlim([freq.min() + 0.25, freq.max() - 0.25]) | |
plt.ylim([-2, 5]) | |
plt.xlabel("Observed frequency (GHz)") | |
plt.ylabel("Ta* (K)") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
"""Module for CASA Single Dish Pipeline.""" | |
# standard library | |
import logging | |
from contextlib import contextmanager | |
logger = logging.getLogger(__name__) | |
# dependent packages | |
import numpy as np | |
try: | |
import asap as sd | |
sd.rcParams["insitu"] = False | |
except ImportError: | |
message = "CASA<=5.0.0 is required to use this module" | |
raise ImportError(message) | |
# module constants for SAM45 | |
N_CH = 4096 | |
BANDWIDTH = 2e9 | |
# functions for user | |
def load_nqm(nqmname, ifs=None): | |
"""Load nqm file and return scantable containing selected IFs. | |
Args: | |
nqmname (str): Name of NRO45m/SAM45 SD data (*.nqm). | |
ifs (list of int, optional): IF numbers to be selected. | |
Returns: | |
scantable (asap.scantable.scantable): Scantable containing selected IFs. | |
""" | |
if ifs is None: | |
ifs = [] | |
st = sd.scantable(nqmname, average=False) | |
st.set_selection(ifs=ifs) | |
return st | |
def create_mask(st, edgech=500, linerange=None, lineunit="GHz"): | |
"""Create channel mask not to be used for baseline fit.""" | |
mask = np.zeros(N_CH, bool) | |
if edgech > 1: | |
with use_unit(st, "channel"): | |
regions = [0, edgech], [N_CH - edgech, N_CH] | |
mask += np.array(st.create_mask(*regions)) | |
if linerange is not None: | |
with use_unit(st, lineunit): | |
mask += np.array(st.create_mask(linerange)) | |
return mask | |
def apply_blfit(st, mask, polyorder=1, fftthresh=None): | |
"""Apply baseline fit to scantable with given mask. | |
Args: | |
scantable (asap.scantable.scantable): Input scantable. | |
mask (numpy.ndarray): Mask array created by `create_mask`. | |
fftthresh: S/N threshold of | |
""" | |
st = st.auto_poly_baseline(mask=~mask, order=polyorder) | |
if fftthresh is not None: | |
st = st.auto_sinusoid_baseline(mask=~mask, fftthresh=fftthresh) | |
return st | |
def create_flag(st, mask, wiggle_limit=1.0, wind_limit=5.0): | |
tsys = np.array(st.get_tsys()) | |
tint = np.array(st.get_inttime()) | |
rms = np.array(st.stats(stat="rms", mask=list(~mask))) | |
noi = np.sqrt(2) * tsys / np.sqrt(tint * BANDWIDTH / N_CH) | |
wigl = rms / noi | |
wind = np.array([w["windspeed"] for w in st.get_weather()]) | |
flag = (wigl > wiggle_limit) | (wind > wind_limit) | |
return flag | |
def apply_flag(st, flag): | |
rows = np.where(~flag)[0] | |
n_used = np.sum(~flag) | |
n_flagged = len(flag) - n_used | |
with use_unit(st, "GHz"): | |
freq = np.array(st.get_abcissa()[0]) | |
tsys = np.array(st.get_tsys())[rows] | |
tint = np.array(st.get_inttime())[rows] | |
scan = np.array([st.get_spectrum(r) for r in rows]) | |
return { | |
"freq": freq, | |
"tsys": tsys, | |
"tint": tint, | |
"scan": scan, | |
"n_used": n_used, | |
"n_flagged": n_flagged, | |
} | |
def create_spectrum(scan, tsys, nbin=64): | |
spec = np.average(scan, 0, weights=tsys**-2) | |
spec = np.repeat(np.mean(spec.reshape((N_CH / nbin, nbin)), 1), repeats=nbin) | |
return spec | |
def create_noisespectrum(scan, nbin=64): | |
spec = np.std(scan, axis=0) / np.sqrt(len(scan) * nbin) | |
spec = np.repeat(np.mean(spec.reshape((N_CH / nbin, nbin)), 1), repeats=nbin) | |
return spec | |
# functions for internal | |
@contextmanager | |
def use_unit(st, unit): | |
oldunit = st.get_unit() | |
try: | |
st.set_unit(unit) | |
yield | |
finally: | |
st.set_unit(oldunit) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment