Skip to content

Instantly share code, notes, and snippets.

@ars096
Created May 14, 2015 02:32
Show Gist options
  • Save ars096/f58e578d28266477f9f9 to your computer and use it in GitHub Desktop.
Save ars096/f58e578d28266477f9f9 to your computer and use it in GitHub Desktop.
NRO45-SAM45 van Vleck test
import numpy
import sam45tool
ddir = '../../../download/'
d1 = sam45tool.load_sldump(ddir+'SAM45.BET5ori.nstest2.pjtest.20150428155923.A1.txt')
d2 = sam45tool.load_sldump(ddir+'SAM45.BET5ori.nstest2.pjtest.20150428163101.A1.txt')
lev1 = sam45tool.loadtxt_panda(ddir+'20150428155923.txt')
lev2 = sam45tool.loadtxt_panda(ddir+'20150428163101.txt')
#dd1 = sam45tool.marge(d1, lev1)
# --
d1mask = (d1.array =='A1')
l1mask = (lev1.array == 1)
mode = numpy.zeros(len(d1))
mode[d1.mode=='R'] = 1
mode[d1.mode=='SKY'] = 2
mode[d1.mode=='OFF'] = 3
mode[d1.mode=='ON'] = 4
# --
xmin = lev1.time[l1mask].min()
xmax = lev1.time[l1mask].max()
import pylab
import matplotlib.dates
xtick = matplotlib.dates.MinuteLocator()
xfmt = matplotlib.dates.DateFormatter('%H:%M')
fig = pylab.figure()
ax1 = fig.add_subplot(311)
ax2 = fig.add_subplot(312)
ax3 = fig.add_subplot(313)
ax1.plot(lev1.time[l1mask], lev1.sig[l1mask]**2)
ax1.set_xlim(xmin, xmax)
ax1.set_ylim(0, 8)
ax1.set_yscale('linear')
ax1.xaxis.set_major_locator(xtick)
ax1.xaxis.set_major_formatter(xfmt)
ax1.grid(True)
ax1.set_ylabel('AD Power')
ax2.plot(lev1.time[l1mask], lev1.sig[l1mask]**2)
ax2.set_yscale('linear')
ax2.set_xlim(xmin, xmax)
ax2.set_ylim(0.9, 1.)
ax2.xaxis.set_major_locator(xtick)
ax2.xaxis.set_major_formatter(xfmt)
ax2.grid(True)
ax2.set_ylabel('Zoom')
ax3.step(d1.time, mode, 'r.', where='post')
ax3.set_xlim(xmin, xmax)
ax3.set_ylim(-1, 6)
ax3.set_yticks([0, 1, 2, 3, 4])
ax3.set_yticklabels(['ZERO', 'R', 'SKY', 'OFF', 'ON'])
ax3.xaxis.set_major_locator(xtick)
ax3.xaxis.set_major_formatter(xfmt)
ax3.grid(True)
fig.subplots_adjust(hspace=0)
fig.suptitle(xmin.strftime('%Y/%m/%d %H:%M:%S'))
fig.savefig('datacheck1.png')
import datetime
import numpy
import sam45tool
ddir = '../../../download/'
d1 = sam45tool.load_sldump(ddir+'SAM45.BET5ori.nstest2.pjtest.20150428155923.A1.txt')
lev1 = sam45tool.loadtxt_panda(ddir+'20150428155923.txt')
# --
d1mask = (d1.array =='A1')
l1mask = (lev1.array == 1)
mode = numpy.zeros(len(d1))
mode[d1.mode=='R'] = 1
mode[d1.mode=='SKY'] = 2
mode[d1.mode=='OFF'] = 3
mode[d1.mode=='ON'] = 4
# --
ind = numpy.array([int(numpy.where(lev1.time[l1mask] == (_d.time + datetime.timedelta(seconds=2)))[0])
for _d in d1])
lev = lev1[l1mask][ind]
# --
xmin = lev1.time[l1mask].min()
xmax = lev1.time[l1mask].max()
import pylab
import matplotlib.dates
xtick = matplotlib.dates.MinuteLocator()
xfmt = matplotlib.dates.DateFormatter('%H:%M')
fig = pylab.figure(figsize=(30, 8))
ax1 = [fig.add_subplot(4, 20, i+1) for i in range(20)]
ax2 = [fig.add_subplot(4, 20, i+21) for i in range(20)]
ax3 = [fig.add_subplot(4, 20, i+41) for i in range(20)]
ax4 = [fig.add_subplot(4, 20, i+61) for i in range(20)]
Thot = 290.0
for i in range(len(ax1)):
sp = d1[i].spectrum
csp = lev[i].a0 + lev[i].a1 * sp + lev[i].a2 * sp**2.
ax1[i].plot(lev[i].level)
ax2[i].plot(sp)
ax3[i].plot(csp)
if d1[i].mode == 'R':
dhot = sp
cdhot = csp
pass
if d1[i].mode == 'OFF':
dsky = sp
cdsky = csp
pass
if d1[i].mode == 'ON':
ta = (sp - dsky)/(dhot - dsky) * Thot
cta = (csp - cdsky)/(cdhot - cdsky) * Thot
ax4[i].plot(ta, 'b')
ax4[i].plot(cta, 'r')
pass
continue
[a.set_title(m) for a, m in zip(ax1, d1.mode)]
[a.set_yticklabels('') for a in ax1[1:]]
[a.set_ylim(0, 40) for a in ax1]
ax2[0].set_ylabel('Original')
[a.set_xticklabels('') for a in ax2]
[a.set_yticklabels('') for a in ax2[1:]]
[a.set_ylim(0, 160000) for a in ax2]
ax3[0].set_ylabel('Corrected')
[a.set_xticklabels('') for a in ax3]
[a.set_yticklabels('') for a in ax3[1:]]
[a.set_ylim(0, 160000) for a in ax3]
ax4[0].set_ylabel('B:Orig, R:Corr.')
[a.set_xticklabels('') for a in ax4]
[a.set_yticklabels('') for a in ax4[1:]]
[a.set_ylim(-10, 400) for a in ax4]
fig.subplots_adjust(wspace=0.05)
fig.suptitle(xmin.strftime('%Y/%m/%d %H:%M:%S'))
fig.savefig('datacheck_sp1.png')
# File I/O
# ========
def loadtxt_panda(path):
import datetime
import numpy
original_data = open(path).readlines()
att = 0
array_no = 0
timestamp = '00000000000000'
data = []
for line in original_data:
if line.split()==[]:
continue
_d = line.split()
timestamp = datetime.datetime.strptime(_d[0], '%Y%m%d%H%M%S')
be = _d[1]
array = int(_d[2][1:])
power = map(float, _d[3:])
data.append((timestamp, array, power))
continue
log_dtype = [('time', object), ('array', int), ('level', float, 8)]
log_data = numpy.rec.array(data, dtype=log_dtype)
dtype = log_dtype + [('mu', float), ('sig', float),
('a0', float), ('a1', float), ('a2', float)]
ret = []
x = numpy.arange(-3.5, 3.6, 1)
correct_factors = load_correct_factors()
for d in log_data:
fp = gauss_fit(x, d.level)
mu = fp[0][0]
sig = fp[0][1]
a0, a1, a2 = correct_factors(sig)
ret.append(tuple(list(d) + [mu, sig, a0, a1, a2]))
continue
#return ret
return numpy.rec.array(ret, dtype=dtype)
def load_sldump(path):
import datetime
import numpy
dtype = [('time', object), ('array', str, 3), ('mode', str, 4), ('spectrum', float, 4096)]
d = numpy.loadtxt(path, dtype=dtype)
d['time'] = [datetime.datetime.strptime(_d, '%Y%m%d%H%M%S') for _d in d['time']]
d = [tuple(_d) for _d in d]
dd = numpy.rec.array(d, dtype=dtype)
return dd
def load_correct_factors():
import pickle
path = 'correct3bit_offset_0.0.pickle'
with open(path, 'rb') as f:
c0, c1, c2 = pickle.load(f)
pass
def func(x):
return float(c0(x)), float(c1(x)), float(c2(x))
return func
# Gaussian Fitting
# ================
def gauss(p, x):
from numpy import sqrt, exp, pi
return 1./sqrt(2*pi*p[1]) * exp(-(x-p[0])**2./(2*p[1]))
def cumgauss(p, x):
from scipy.special import erf
from numpy import sqrt
return 1/2. * (1. + erf((x-p[0])/sqrt(2*p[1])))
def gauss_fit(x, y, full_output=False):
import numpy
import scipy.optimize
def fitfunc(p, x, y):
return y - cumgauss(p, x)
dx = abs(x[1] - x[0])
cumy = numpy.cumsum(y) / 100.
cumx = x + dx/2.
cumx[-1] = 30
p0 = [0, 0.5]
result = scipy.optimize.leastsq(fitfunc, p0, args=(cumx, cumy), full_output=full_output)
return result
# ADC sim.
# ========
def adc(data, bin_centers, sigma=1e-10):
import numpy
noise = numpy.random.normal(0, sigma, data.size)
thresholds = bin_centers[:-1] + (bin_centers[1:] - bin_centers[:-1])/2.
digitized = numpy.digitize(data+noise, thresholds)
return bin_centers[digitized]
def powerspectrum(data):
import numpy
fft = numpy.fft.fft(data)[:data.size/2]
powerspectrum = numpy.real(fft * numpy.conj(fft))
return powerspectrum
def generate_noise(sigma=2, mu=0, size=2048):
import numpy
noise = numpy.random.normal(mu, sigma, size)
return noise
def create_bandpassfilter(size=1024, skirt_width=0.3, base=1, params=[[0.8, 0.06], [1, 0.16], [3, 0.04], [6, 0.02]]):
import numpy
from numpy import sin, exp, pi
from scipy.special import erf
skirt = size * skirt_width
x = 2 * pi * numpy.arange(size) / size
curve = numpy.zeros(size) + base
for omega, scale in params: curve += scale * numpy.sin(x*omega)
curve[:skirt] = (1+erf(numpy.linspace(-2, 4, skirt)))/2. * curve[:skirt]
curve[-skirt:] = (1+erf(numpy.linspace(4, -2, skirt)))/2. * curve[-skirt:]
#curve[:skirt] = numpy.linspace(0, 1, skirt) * curve[:skirt]
#curve[-skirt:] = numpy.linspace(1, 0, skirt) * curve[-skirt:]
return curve
def create_lowpassfilter(size=1024, tau=0.5):
import numpy
f = numpy.linspace(0, 1, size)
curve = 1. / (1. + (f*tau)**2)**0.5
return curve
def apply_filter(data, filter):
import numpy
d = data.reshape([-1, filter.size*2])
sqfilter = filter**0.5
spectrum = numpy.fft.fft(d, axis=0)
filtered_spectrum = spectrum * numpy.concatenate([sqfilter, sqfilter[::-1]])
filtered_data = numpy.real(numpy.fft.ifft(filtered_spectrum, axis=0))
return filtered_data.ravel(), filtered_spectrum
def powerspectrum(data, unit=2048):
import numpy
d = data.reshape([-1, unit])
spectrum = numpy.fft.fft(d, axis=0)
return spectrum * spectrum.conj()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment