Created
May 14, 2015 02:32
-
-
Save ars096/f58e578d28266477f9f9 to your computer and use it in GitHub Desktop.
NRO45-SAM45 van Vleck test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
import sam45tool | |
ddir = '../../../download/' | |
d1 = sam45tool.load_sldump(ddir+'SAM45.BET5ori.nstest2.pjtest.20150428155923.A1.txt') | |
d2 = sam45tool.load_sldump(ddir+'SAM45.BET5ori.nstest2.pjtest.20150428163101.A1.txt') | |
lev1 = sam45tool.loadtxt_panda(ddir+'20150428155923.txt') | |
lev2 = sam45tool.loadtxt_panda(ddir+'20150428163101.txt') | |
#dd1 = sam45tool.marge(d1, lev1) | |
# -- | |
d1mask = (d1.array =='A1') | |
l1mask = (lev1.array == 1) | |
mode = numpy.zeros(len(d1)) | |
mode[d1.mode=='R'] = 1 | |
mode[d1.mode=='SKY'] = 2 | |
mode[d1.mode=='OFF'] = 3 | |
mode[d1.mode=='ON'] = 4 | |
# -- | |
xmin = lev1.time[l1mask].min() | |
xmax = lev1.time[l1mask].max() | |
import pylab | |
import matplotlib.dates | |
xtick = matplotlib.dates.MinuteLocator() | |
xfmt = matplotlib.dates.DateFormatter('%H:%M') | |
fig = pylab.figure() | |
ax1 = fig.add_subplot(311) | |
ax2 = fig.add_subplot(312) | |
ax3 = fig.add_subplot(313) | |
ax1.plot(lev1.time[l1mask], lev1.sig[l1mask]**2) | |
ax1.set_xlim(xmin, xmax) | |
ax1.set_ylim(0, 8) | |
ax1.set_yscale('linear') | |
ax1.xaxis.set_major_locator(xtick) | |
ax1.xaxis.set_major_formatter(xfmt) | |
ax1.grid(True) | |
ax1.set_ylabel('AD Power') | |
ax2.plot(lev1.time[l1mask], lev1.sig[l1mask]**2) | |
ax2.set_yscale('linear') | |
ax2.set_xlim(xmin, xmax) | |
ax2.set_ylim(0.9, 1.) | |
ax2.xaxis.set_major_locator(xtick) | |
ax2.xaxis.set_major_formatter(xfmt) | |
ax2.grid(True) | |
ax2.set_ylabel('Zoom') | |
ax3.step(d1.time, mode, 'r.', where='post') | |
ax3.set_xlim(xmin, xmax) | |
ax3.set_ylim(-1, 6) | |
ax3.set_yticks([0, 1, 2, 3, 4]) | |
ax3.set_yticklabels(['ZERO', 'R', 'SKY', 'OFF', 'ON']) | |
ax3.xaxis.set_major_locator(xtick) | |
ax3.xaxis.set_major_formatter(xfmt) | |
ax3.grid(True) | |
fig.subplots_adjust(hspace=0) | |
fig.suptitle(xmin.strftime('%Y/%m/%d %H:%M:%S')) | |
fig.savefig('datacheck1.png') | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import numpy | |
import sam45tool | |
ddir = '../../../download/' | |
d1 = sam45tool.load_sldump(ddir+'SAM45.BET5ori.nstest2.pjtest.20150428155923.A1.txt') | |
lev1 = sam45tool.loadtxt_panda(ddir+'20150428155923.txt') | |
# -- | |
d1mask = (d1.array =='A1') | |
l1mask = (lev1.array == 1) | |
mode = numpy.zeros(len(d1)) | |
mode[d1.mode=='R'] = 1 | |
mode[d1.mode=='SKY'] = 2 | |
mode[d1.mode=='OFF'] = 3 | |
mode[d1.mode=='ON'] = 4 | |
# -- | |
ind = numpy.array([int(numpy.where(lev1.time[l1mask] == (_d.time + datetime.timedelta(seconds=2)))[0]) | |
for _d in d1]) | |
lev = lev1[l1mask][ind] | |
# -- | |
xmin = lev1.time[l1mask].min() | |
xmax = lev1.time[l1mask].max() | |
import pylab | |
import matplotlib.dates | |
xtick = matplotlib.dates.MinuteLocator() | |
xfmt = matplotlib.dates.DateFormatter('%H:%M') | |
fig = pylab.figure(figsize=(30, 8)) | |
ax1 = [fig.add_subplot(4, 20, i+1) for i in range(20)] | |
ax2 = [fig.add_subplot(4, 20, i+21) for i in range(20)] | |
ax3 = [fig.add_subplot(4, 20, i+41) for i in range(20)] | |
ax4 = [fig.add_subplot(4, 20, i+61) for i in range(20)] | |
Thot = 290.0 | |
for i in range(len(ax1)): | |
sp = d1[i].spectrum | |
csp = lev[i].a0 + lev[i].a1 * sp + lev[i].a2 * sp**2. | |
ax1[i].plot(lev[i].level) | |
ax2[i].plot(sp) | |
ax3[i].plot(csp) | |
if d1[i].mode == 'R': | |
dhot = sp | |
cdhot = csp | |
pass | |
if d1[i].mode == 'OFF': | |
dsky = sp | |
cdsky = csp | |
pass | |
if d1[i].mode == 'ON': | |
ta = (sp - dsky)/(dhot - dsky) * Thot | |
cta = (csp - cdsky)/(cdhot - cdsky) * Thot | |
ax4[i].plot(ta, 'b') | |
ax4[i].plot(cta, 'r') | |
pass | |
continue | |
[a.set_title(m) for a, m in zip(ax1, d1.mode)] | |
[a.set_yticklabels('') for a in ax1[1:]] | |
[a.set_ylim(0, 40) for a in ax1] | |
ax2[0].set_ylabel('Original') | |
[a.set_xticklabels('') for a in ax2] | |
[a.set_yticklabels('') for a in ax2[1:]] | |
[a.set_ylim(0, 160000) for a in ax2] | |
ax3[0].set_ylabel('Corrected') | |
[a.set_xticklabels('') for a in ax3] | |
[a.set_yticklabels('') for a in ax3[1:]] | |
[a.set_ylim(0, 160000) for a in ax3] | |
ax4[0].set_ylabel('B:Orig, R:Corr.') | |
[a.set_xticklabels('') for a in ax4] | |
[a.set_yticklabels('') for a in ax4[1:]] | |
[a.set_ylim(-10, 400) for a in ax4] | |
fig.subplots_adjust(wspace=0.05) | |
fig.suptitle(xmin.strftime('%Y/%m/%d %H:%M:%S')) | |
fig.savefig('datacheck_sp1.png') | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# File I/O | |
# ======== | |
def loadtxt_panda(path): | |
import datetime | |
import numpy | |
original_data = open(path).readlines() | |
att = 0 | |
array_no = 0 | |
timestamp = '00000000000000' | |
data = [] | |
for line in original_data: | |
if line.split()==[]: | |
continue | |
_d = line.split() | |
timestamp = datetime.datetime.strptime(_d[0], '%Y%m%d%H%M%S') | |
be = _d[1] | |
array = int(_d[2][1:]) | |
power = map(float, _d[3:]) | |
data.append((timestamp, array, power)) | |
continue | |
log_dtype = [('time', object), ('array', int), ('level', float, 8)] | |
log_data = numpy.rec.array(data, dtype=log_dtype) | |
dtype = log_dtype + [('mu', float), ('sig', float), | |
('a0', float), ('a1', float), ('a2', float)] | |
ret = [] | |
x = numpy.arange(-3.5, 3.6, 1) | |
correct_factors = load_correct_factors() | |
for d in log_data: | |
fp = gauss_fit(x, d.level) | |
mu = fp[0][0] | |
sig = fp[0][1] | |
a0, a1, a2 = correct_factors(sig) | |
ret.append(tuple(list(d) + [mu, sig, a0, a1, a2])) | |
continue | |
#return ret | |
return numpy.rec.array(ret, dtype=dtype) | |
def load_sldump(path): | |
import datetime | |
import numpy | |
dtype = [('time', object), ('array', str, 3), ('mode', str, 4), ('spectrum', float, 4096)] | |
d = numpy.loadtxt(path, dtype=dtype) | |
d['time'] = [datetime.datetime.strptime(_d, '%Y%m%d%H%M%S') for _d in d['time']] | |
d = [tuple(_d) for _d in d] | |
dd = numpy.rec.array(d, dtype=dtype) | |
return dd | |
def load_correct_factors(): | |
import pickle | |
path = 'correct3bit_offset_0.0.pickle' | |
with open(path, 'rb') as f: | |
c0, c1, c2 = pickle.load(f) | |
pass | |
def func(x): | |
return float(c0(x)), float(c1(x)), float(c2(x)) | |
return func | |
# Gaussian Fitting | |
# ================ | |
def gauss(p, x): | |
from numpy import sqrt, exp, pi | |
return 1./sqrt(2*pi*p[1]) * exp(-(x-p[0])**2./(2*p[1])) | |
def cumgauss(p, x): | |
from scipy.special import erf | |
from numpy import sqrt | |
return 1/2. * (1. + erf((x-p[0])/sqrt(2*p[1]))) | |
def gauss_fit(x, y, full_output=False): | |
import numpy | |
import scipy.optimize | |
def fitfunc(p, x, y): | |
return y - cumgauss(p, x) | |
dx = abs(x[1] - x[0]) | |
cumy = numpy.cumsum(y) / 100. | |
cumx = x + dx/2. | |
cumx[-1] = 30 | |
p0 = [0, 0.5] | |
result = scipy.optimize.leastsq(fitfunc, p0, args=(cumx, cumy), full_output=full_output) | |
return result | |
# ADC sim. | |
# ======== | |
def adc(data, bin_centers, sigma=1e-10): | |
import numpy | |
noise = numpy.random.normal(0, sigma, data.size) | |
thresholds = bin_centers[:-1] + (bin_centers[1:] - bin_centers[:-1])/2. | |
digitized = numpy.digitize(data+noise, thresholds) | |
return bin_centers[digitized] | |
def powerspectrum(data): | |
import numpy | |
fft = numpy.fft.fft(data)[:data.size/2] | |
powerspectrum = numpy.real(fft * numpy.conj(fft)) | |
return powerspectrum | |
def generate_noise(sigma=2, mu=0, size=2048): | |
import numpy | |
noise = numpy.random.normal(mu, sigma, size) | |
return noise | |
def create_bandpassfilter(size=1024, skirt_width=0.3, base=1, params=[[0.8, 0.06], [1, 0.16], [3, 0.04], [6, 0.02]]): | |
import numpy | |
from numpy import sin, exp, pi | |
from scipy.special import erf | |
skirt = size * skirt_width | |
x = 2 * pi * numpy.arange(size) / size | |
curve = numpy.zeros(size) + base | |
for omega, scale in params: curve += scale * numpy.sin(x*omega) | |
curve[:skirt] = (1+erf(numpy.linspace(-2, 4, skirt)))/2. * curve[:skirt] | |
curve[-skirt:] = (1+erf(numpy.linspace(4, -2, skirt)))/2. * curve[-skirt:] | |
#curve[:skirt] = numpy.linspace(0, 1, skirt) * curve[:skirt] | |
#curve[-skirt:] = numpy.linspace(1, 0, skirt) * curve[-skirt:] | |
return curve | |
def create_lowpassfilter(size=1024, tau=0.5): | |
import numpy | |
f = numpy.linspace(0, 1, size) | |
curve = 1. / (1. + (f*tau)**2)**0.5 | |
return curve | |
def apply_filter(data, filter): | |
import numpy | |
d = data.reshape([-1, filter.size*2]) | |
sqfilter = filter**0.5 | |
spectrum = numpy.fft.fft(d, axis=0) | |
filtered_spectrum = spectrum * numpy.concatenate([sqfilter, sqfilter[::-1]]) | |
filtered_data = numpy.real(numpy.fft.ifft(filtered_spectrum, axis=0)) | |
return filtered_data.ravel(), filtered_spectrum | |
def powerspectrum(data, unit=2048): | |
import numpy | |
d = data.reshape([-1, unit]) | |
spectrum = numpy.fft.fft(d, axis=0) | |
return spectrum * spectrum.conj() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment