Skip to content

Instantly share code, notes, and snippets.

@astropenguin
Last active December 9, 2022 11:08
Show Gist options
  • Save astropenguin/4ba0e032cb02cd8549393b2ebb10f8f0 to your computer and use it in GitHub Desktop.
Save astropenguin/4ba0e032cb02cd8549393b2ebb10f8f0 to your computer and use it in GitHub Desktop.
Pipeline and ancillary module for data reduction of NRO45m/ASTE position-switching data (*nqm) with CASA (<= 5.0.0)
# coding: utf-8
import sdpl
import numpy as np
import matplotlib.pyplot as plt
"""CASA pipeline for NRO45m/SAM45 PSW data.
This script uses the ATNF Spectral Analysis Package (ASAP),
moduled as `sd` in the Common Astronomy Software Packages (CASA).
So user must execute the script on a CASA prompt:
CASA <2>: execfile('pipeline.py')
Array IDs and corresponding IF numbers are:
+ TZ1V: 0, 1
+ TZ1H: 2, 3
+ TZ2V: 4, 5
+ TZ2H: 6, 7
"""
# data selection: put paths of nqm files
nqmnames = [
"path-to-nqm-01.nqm",
"path-to-nqm-02.nqm",
"path-to-nqm-03.nqm",
]
# data selection: select IF numbers to use (see above)
ifs = [0, 2, 4, 6]
# baseline fit: put number of edge channels to be masked
edgech = 500
# baseline fit: put frequency range (GHz) of line emission to be ignored
linerange = [110.0, 111.0]
# baseline fit: put maximum order of polynomial baseline fit.
polyorder = 1
# baseline fit: put S/N threshold of sinusoid baseline fit.
fftthresh = "7sigma" # or None if you do not apply it.
# data flagging: Maximum acceptable wiggle relative to noise level.
wiggle_limit = 1.1
# data flagging: Maximu acceptable wind speed (m/s).
wind_limit = 5.0
# spectral maiking: Number of channels to be binned.
nbin = 64
# applying baseline fit and flagging to each nqm data
list_scan = []
list_tsys = []
list_tint = []
n_used = 0
n_flagged = 0
for nqmname in nqmnames:
print("Now loading: {}".format(nqmname))
st = sdpl.load_nqm(nqmname, ifs)
mask = sdpl.create_mask(st, edgech, linerange)
stbl = sdpl.apply_blfit(st, mask, polyorder, fftthresh)
flag = sdpl.create_flag(stbl, mask, wiggle_limit, wind_limit)
output = sdpl.apply_flag(stbl, flag)
list_scan.append(output["scan"])
list_tsys.append(output["tsys"])
list_tint.append(output["tint"])
n_used += float(output["n_used"])
n_flagged += float(output["n_flagged"])
freq = output["freq"]
scan = np.concatenate(list_scan)
tsys = np.concatenate(list_tsys)
tint = np.concatenate(list_tint)
# create integrated spectrum
spec = sdpl.create_spectrum(scan, tsys, nbin)
noise = sdpl.create_noisespectrum(scan, nbin)
# print results
rms = np.median(noise)
sn_max = np.max(spec / noise)
on_src = np.sum(tint) / len(ifs)
print("Achieved noise level: {:.2f} mK".format(1e3 * rms))
print("Maximum S/N of line: {:.1f} sigma".format(sn_max))
print("Total on-source time: {:.2f} hr".format(on_src / 3600))
print("Data usage rate: {:.1%}".format(n_used / (n_used + n_flagged)))
# plotting
plt.figure(figsize=(12, 4))
plt.plot(freq, spec)
plt.fill_between(freq, noise, -noise, alpha=0.5)
plt.xlim([freq.min() + 0.25, freq.max() - 0.25])
plt.ylim([-2, 5])
plt.xlabel("Observed frequency (GHz)")
plt.ylabel("Ta* (K)")
# coding: utf-8
"""Module for CASA Single Dish Pipeline."""
# standard library
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
# dependent packages
import numpy as np
try:
import asap as sd
sd.rcParams["insitu"] = False
except ImportError:
message = "CASA<=5.0.0 is required to use this module"
raise ImportError(message)
# module constants for SAM45
N_CH = 4096
BANDWIDTH = 2e9
# functions for user
def load_nqm(nqmname, ifs=None):
"""Load nqm file and return scantable containing selected IFs.
Args:
nqmname (str): Name of NRO45m/SAM45 SD data (*.nqm).
ifs (list of int, optional): IF numbers to be selected.
Returns:
scantable (asap.scantable.scantable): Scantable containing selected IFs.
"""
if ifs is None:
ifs = []
st = sd.scantable(nqmname, average=False)
st.set_selection(ifs=ifs)
return st
def create_mask(st, edgech=500, linerange=None, lineunit="GHz"):
"""Create channel mask not to be used for baseline fit."""
mask = np.zeros(N_CH, bool)
if edgech > 1:
with use_unit(st, "channel"):
regions = [0, edgech], [N_CH - edgech, N_CH]
mask += np.array(st.create_mask(*regions))
if linerange is not None:
with use_unit(st, lineunit):
mask += np.array(st.create_mask(linerange))
return mask
def apply_blfit(st, mask, polyorder=1, fftthresh=None):
"""Apply baseline fit to scantable with given mask.
Args:
scantable (asap.scantable.scantable): Input scantable.
mask (numpy.ndarray): Mask array created by `create_mask`.
fftthresh: S/N threshold of
"""
st = st.auto_poly_baseline(mask=~mask, order=polyorder)
if fftthresh is not None:
st = st.auto_sinusoid_baseline(mask=~mask, fftthresh=fftthresh)
return st
def create_flag(st, mask, wiggle_limit=1.0, wind_limit=5.0):
tsys = np.array(st.get_tsys())
tint = np.array(st.get_inttime())
rms = np.array(st.stats(stat="rms", mask=list(~mask)))
noi = np.sqrt(2) * tsys / np.sqrt(tint * BANDWIDTH / N_CH)
wigl = rms / noi
wind = np.array([w["windspeed"] for w in st.get_weather()])
flag = (wigl > wiggle_limit) | (wind > wind_limit)
return flag
def apply_flag(st, flag):
rows = np.where(~flag)[0]
n_used = np.sum(~flag)
n_flagged = len(flag) - n_used
with use_unit(st, "GHz"):
freq = np.array(st.get_abcissa()[0])
tsys = np.array(st.get_tsys())[rows]
tint = np.array(st.get_inttime())[rows]
scan = np.array([st.get_spectrum(r) for r in rows])
return {
"freq": freq,
"tsys": tsys,
"tint": tint,
"scan": scan,
"n_used": n_used,
"n_flagged": n_flagged,
}
def create_spectrum(scan, tsys, nbin=64):
spec = np.average(scan, 0, weights=tsys**-2)
spec = np.repeat(np.mean(spec.reshape((N_CH / nbin, nbin)), 1), repeats=nbin)
return spec
def create_noisespectrum(scan, nbin=64):
spec = np.std(scan, axis=0) / np.sqrt(len(scan) * nbin)
spec = np.repeat(np.mean(spec.reshape((N_CH / nbin, nbin)), 1), repeats=nbin)
return spec
# functions for internal
@contextmanager
def use_unit(st, unit):
oldunit = st.get_unit()
try:
st.set_unit(unit)
yield
finally:
st.set_unit(oldunit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment