Skip to content

Instantly share code, notes, and snippets.

@pkgw
Last active December 25, 2015 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pkgw/fdf4c48573a0d2631c04 to your computer and use it in GitHub Desktop.
Save pkgw/fdf4c48573a0d2631c04 to your computer and use it in GitHub Desktop.
Simple pwkit CLEAN implementation, never really used so who knows if it works
# Copyright 2015 Peter Williams
# Licensed under the MIT License.
import numpy as np
from scipy.signal import lombscargle
from pwkit import lsqmdl
twopi = 2 * np.pi
ncoarse = 512
def sin_model (pars, fx):
a, ph = pars
return a * np.sin (fx + ph)
class LSClean (object):
nfine = 256
gain = 0.2
def __init__ (self, x, y, u):
self.x = np.asarray (x)
self.orig_y = np.asarray (y)
self.orig_y -= self.orig_y.mean () # would disappear on first iteration
self.cur_y = self.orig_y.copy ()
self.invu = 1. / np.asarray (u) # this should evolve as we fit, I think?
self.components = []
self.fmax = twopi / (0.5 * (self.x.max () - self.x.min ()))
self.fmin = twopi / (2 * np.abs (self.x[1:] - self.x[:-1]).min ())
self.coarse_freqs = np.logspace (np.log10 (self.fmin),
np.log10 (self.fmax),
ncoarse)
self.model = lsqmdl.Model (None, self.cur_y, self.invu)
def iterate (self, n):
for _ in xrange (n):
ls = lombscargle (self.x, self.cur_y, self.coarse_freqs)
fcoarse = self.coarse_freqs[ls.argmax ()]
fine_freqs = np.linspace (0.8 * fcoarse, 1.2 * fcoarse, self.nfine)
ls = lombscargle (self.x, self.cur_y, fine_freqs)
imax = ls.argmax ()
ffine = fine_freqs[imax]
afine = ls[imax]
fx = ffine * self.x
self.model.set_data (self.cur_y, self.invu)
self.model.set_func (sin_model, ['amp', 'ph'], args=(fx, ))
self.model.solve ([np.abs (self.cur_y).max (), 0.5])
# Apply CLEAN downweighting and normalize parameters
amp = self.gain * self.model.params[0]
pha = self.model.params[1]
if amp < 0:
amp = -amp
pha += np.pi
pha = ((pha + np.pi) % twopi) - np.pi
self.cur_y -= amp * np.sin (fx + pha)
self.components.append ([twopi / ffine, amp, pha])
def sample (self, x, ntrunc=None):
x = np.asarray (x)
y = np.zeros (x.shape)
if ntrunc is None:
complist = self.components
else:
complist = sorted (self.components, key=lambda c: c[1], reverse=True)[:ntrunc]
for prd, amp, pha in complist:
y += amp * np.sin (twopi * x / prd + pha)
return y
def engridden (df, xcol, ycol, padding=0):
# we assume that we're sorted in X.
import pandas as pd
x = df[xcol]
y = df[ycol]
xmax, xmin = x.max (), x.min ()
span = xmax - xmin
dx = x.diff ()
assert (dx > 0)[1:].all () # [1:] because dx[0]=NaN
delta = dx.min ()
n = int (np.ceil (span / delta)) + 1
if n % 2 == 0:
n += 1 # there are advantages to using an odd nr of elements
print ('gridding to', n, 'elements')
# Laziness! We use an optimizer to figure out the "best" coefficients
# to map the measured X values to a grid, where "best" means the smallest
# differences between the gridded and actual locations. We could do
# better by increasing n, of course.
def deltas (m, b):
idx = np.round ((x - b) / m)
xmdl = m * idx + b
return xmdl
mdl = lsqmdl.Model (deltas, x)
mdl.solve ([span / (n - 1), xmin])
m, b = mdl.params
# Allow for padding so that the FFTClean can convolve the points
# on the grid.
b -= m * padding
n += 2 * padding
# Put everything on the grid
idx = np.round ((x - b) / m).astype (np.int)
assert (idx >= 0).all ()
assert (idx < n).all ()
x = m * np.arange (n) + b
res = pd.DataFrame ({'x': x})
res['y'] = np.nan
res.y[idx] = y
res.idx2x_m, res.idx2x_b = m, b
return res
class FFTClean (object):
gain = 0.2
def __init__ (self, df, xcol, ycol):
self.components = []
self.gridded = engridden (df, xcol, ycol, padding=8)
self.freqs = np.fft.rfftfreq (self.gridded.shape[0], self.gridded.idx2x_m)
self.periods = 1. / np.maximum (self.freqs, self.freqs[1]) # avoid div-by-0
self.n2 = self.gridded.shape[0] // 2
# Convolve onto the grid to smooth out ringing, etc.
conv = np.blackman (5)
self.gridded['sampling'] = 1. * np.isfinite (self.gridded.y)
self.gridded.y.fillna (value=0, inplace=True)
self.gridded['y'] = np.convolve (self.gridded.y, conv, mode='same')
self.gridded['sampling'] = np.convolve (self.gridded.sampling, conv, mode='same')
win = np.abs (np.fft.rfft (self.gridded.sampling))
win = np.concatenate ((win[:0:-1], win))
win /= win.max ()
self.window = win
self.orig_dirty = np.abs (np.fft.rfft (self.gridded.y))
self.cur = self.orig_dirty.copy ()
def iterate (self, n):
for _ in xrange (n):
idx = np.abs (self.cur).argmax () # find negative components too!
ampl = self.gain * self.cur[idx]
# xxx document magic:
ofs = self.n2 - idx
shifted = self.window[ofs:ofs + self.n2 + 1]
self.components.append ((self.freqs[idx], self.periods[idx], idx, ampl))
self.cur -= ampl * shifted
print (self.components[-1])
def get_components (self):
import pandas as pd
a = np.asarray (self.components).T
return pd.DataFrame (dict (zip ('freq period idx ampl'.split (), a)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment