Skip to content

Instantly share code, notes, and snippets.

@patoorio
Forked from endolith/readme.md
Last active August 24, 2021 23:13
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save patoorio/a9f60ef16489639fbf20f23ac49ba24f to your computer and use it in GitHub Desktop.
Save patoorio/a9f60ef16489639fbf20f23ac49ba24f to your computer and use it in GitHub Desktop.
Continuous wavelet analysis by Roger Fearick

This is a fork of the gist https://gist.github.com/endolith/2783866

This fork adds:

  • Option for specifying a specific set of desired scales
  • Several minor fixes that add compatibility with Python3
  • New getnormpower() function, that normalizes the power by the square root of the scales
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 04 09:58:47 2016
@author: porio
"""
from __future__ import division
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
import wavelets
#%% Build and plot a signal to work with
dt=0.005
time=np.arange(0,50,dt)
fbasal1,fbasal2=25,15
amp1,amp2=1,2
bias=55
ampruido=0.2
chrpini,chrpstp=15,30
x=bias + amp1*np.sin(2*np.pi*fbasal1*time) + amp2*np.sin(2*np.pi*fbasal2*time)
x+=np.random.normal(size=len(x))*ampruido
x2=1.2*np.cos(2*np.pi*0.8*(time-chrpini)**2) * (time-chrpini)*np.exp(-(time-chrpini)/10)
x2[(time<chrpini)+(time>chrpstp)]=0
x+=x2
#%% Wavelet transfom - linearly spaced scales
desiredFreq=np.arange(1,40,0.2) # Frequencies we want to analyse
desiredPeriods=1/(desiredFreq*dt)
scales=desiredPeriods/wavelets.Morlet.fourierwl # scales
wavel1=wavelets.Morlet(x,scales=scales)
pwr1=np.sqrt(wavel1.getnormpower()) # Normalized power
fmin=min(desiredFreq)
fmax=max(desiredFreq)
#%% Plot the results - add a spectrogram for comparison
plt.figure(1,figsize=(12,4))
plt.clf()
plt.subplot(311)
plt.plot(time,x,alpha=1)
plt.xlabel('Time (s)')
plt.xlim((0,50))
plt.subplot(312)
plt.specgram(x-np.mean(x),Fs=1/dt,NFFT=128,noverlap=64);
plt.ylim((0,50))
ax1=plt.subplot(313)
plt.imshow(pwr1,cmap='RdBu',vmax=np.max(pwr1),vmin=-np.max(pwr1),
extent=(min(time),max(time),fmin,fmax),origin='lower',
interpolation='none',aspect='auto')
#ax1.set_yscale('log')
ax1.set_ylabel('Frequency (Hz)')
#%% Wavelet transfom - log-spaced scales
desiredFreq=np.logspace(0.5,2,60) # Frequencies we want to analyse - from ~3 to 100
desiredPeriods=1/(desiredFreq*dt)
scales=desiredPeriods/wavelets.Morlet.fourierwl # scales
wavel1=wavelets.Morlet(x,scales=scales)
pwr1=wavel1.getnormpower() # Normalized power
fmin=min(desiredFreq)
fmax=max(desiredFreq)
#%% Plot the results
plt.figure(2,figsize=(12,4))
plt.clf()
plt.subplot(211)
plt.plot(time,x,alpha=1)
plt.xlabel('Time (s)')
plt.xlim((0,50))
ax1=plt.subplot(212)
plt.imshow(pwr1,cmap='RdBu',vmax=np.max(pwr1),vmin=-np.max(pwr1),
extent=(min(time),max(time),fmin,fmax),origin='lower',
interpolation='none',aspect='auto')
ax1.set_yscale('log')
ax1.set_ylabel('Frequency (Hz)')
#%% Changing _omega
# *** ESTO NO FUNCIONA BIEN TODAVÍA ***
omega=5
wavelets.Morlet._omega0=omega
desiredFreq=np.arange(1,40,0.2) # Frequencies we want to analyse
desiredPeriods=1/(desiredFreq*dt)
fourierwl=4* np.pi/(omega + np.sqrt(2.0 + omega**2))
scales=desiredPeriods/fourierwl # scales
wavel1=wavelets.Morlet(x,scales=scales)
pwr1=np.sqrt(wavel1.getnormpower()) # Normalized power
fmin=min(desiredFreq)
fmax=max(desiredFreq)
#%% Plot the results - add a spectrogram for comparison
plt.figure(3,figsize=(12,4))
plt.clf()
plt.subplot(211)
plt.plot(time,x,alpha=1)
plt.xlabel('Time (s)')
plt.xlim((0,50))
ax1=plt.subplot(212)
plt.imshow(pwr1,cmap='RdBu',vmax=np.max(pwr1),vmin=-np.max(pwr1),
extent=(min(time),max(time),fmin,fmax),origin='lower',
interpolation='none',aspect='auto')
# ax1.set_yscale('log')
ax1.set_ylabel('Frequency (Hz)')
import numpy as NP
"""
A module which implements the continuous wavelet transform
Wavelet classes:
Morlet
MorletReal
MexicanHat
Paul2 : Paul order 2
Paul4 : Paul order 4
DOG1 : 1st Derivative Of Gaussian
DOG4 : 4th Derivative Of Gaussian
Haar : Unnormalised version of continuous Haar transform
HaarW : Normalised Haar
Usage e.g.
wavelet=Morlet(data, scales=None, largestscale=2, notes=0, order=2, scaling="log")
data: Numeric array of data (float), with length ndata.
Optimum length is a power of 2 (for FFT)
Worst-case length is a prime
scales: specific scales to convolve. If given, the remaining arguments
are ignored.
Smallest scale should be >= 2 for meaningful data
largestscale:
largest scale as inverse fraction of length
scale = len(data)/largestscale
smallest scale should be >= 2 for meaningful data
notes: number of scale intervals per octave
if notes == 0, scales are on a linear increment
order: order of wavelet for wavelets with variable order
[Paul, DOG, ..]
scaling: "linear" or "log" scaling of the wavelet scale.
Note that feature width in the scale direction
is constant on a log scale.
Attributes of instance:
wavelet.cwt: 2-d array of Wavelet coefficients, (nscales,ndata)
wavelet.nscale: Number of scale intervals
wavelet.scales: Array of scale values
Note that meaning of the scale will depend on the family
wavelet.fourierwl: Factor to multiply scale by to get scale
of equivalent FFT
Using this factor, different wavelet families will
have comparable scales
References:
A practical guide to wavelet analysis
C Torrance and GP Compo
Bull Amer Meteor Soc Vol 79 No 1 61-78 (1998)
naming below vaguely follows this.
find the latest version in https://gist.github.com/patoorio/a9f60ef16489639fbf20f23ac49ba24f
"""
class Cwt:
"""
Base class for continuous wavelet transforms.
Implements cwt via the Fourier transform
Used by subclass which provides the method wf(self,s_omega)
wf is the Fourier transform of the wavelet function.
Returns an instance.
"""
fourierwl=1.00
def _log2(self, x):
# utility function to return (integer) log2
return int( NP.log(float(x))/ NP.log(2.0)+0.0001 )
def __init__(self, data, scales=None, largestscale=1, notes=0, order=2, scaling='linear'):
"""
Continuous wavelet transform of data.
data: data in array to transform, length must be power of 2
scales: specific scales to convolve. If given, the remaining arguments
are ignored.
Smallest scale should be >= 2 for meaningful data
notes: number of scale intervals per octave
largestscale: largest scale as inverse fraction of length
of data array
scale = len(data)/largestscale
smallest scale should be >= 2 for meaningful data
order: Order of wavelet basis function for some families
scaling: Linear or log
"""
ndata = len(data)
self.order=order
self.scale=largestscale
if scales is not None:
self.scales=scales
self.nscale=len(self.scales)
else:
self._setscales(ndata,largestscale,notes,scaling)
self.cwt= NP.zeros((self.nscale,ndata), NP.complex64)
omega= NP.r_[NP.arange(0,ndata//2),NP.arange(-ndata//2,0)]*(2.0*NP.pi/ndata)
datahat=NP.fft.fft(data)
self.fftdata=datahat
#self.psihat0=self.wf(omega*self.scales[3*self.nscale/4])
# loop over scales and compute wavelet coeffiecients at each scale
# using the fft to do the convolution
for scaleindex in range(self.nscale):
currentscale=self.scales[scaleindex]
self.currentscale=currentscale # for internal use
s_omega = omega*currentscale
psihat=self.wf(s_omega)
psihat = psihat * NP.sqrt(2.0*NP.pi*currentscale)
convhat = psihat * datahat
W = NP.fft.ifft(convhat)
self.cwt[scaleindex,0:ndata] = W
return
def _setscales(self,ndata,largestscale,notes,scaling):
"""
Automatic generation of scales based on a largest value, number of notes and scaling.
If notes non-zero, returns a log scale based on notes per octave else a linear scale.
(25/07/08): fix notes!=0 case so smallest scale at [0]
"""
if scaling=="log":
if notes<=0: notes=1
# adjust nscale so smallest scale is 2
noctave=self._log2( ndata/largestscale/2 )
self.nscale=notes*noctave
self.scales=NP.zeros(self.nscale,float)
for j in range(self.nscale):
self.scales[j] = ndata/(self.scale*(2.0**(float(self.nscale-1-j)/notes)))
elif scaling=="linear":
nmax=ndata/largestscale/2
self.scales=NP.arange(float(2),float(nmax))
self.nscale=len(self.scales)
else: raise ValueError("scaling must be linear or log")
return
def getdata(self):
"""Return wavelet coefficient array."""
return self.cwt
def getcoefficients(self):
"""Return wavelet coefficient array (same as getdata)."""
return self.cwt
def getpower(self):
"""Return wavelet coefficient arrays squared."""
return (self.cwt* NP.conjugate(self.cwt)).real
def getnormpower(self):
"""Return square of wavelet coefficient array normalized by scale."""
cwt1=self.cwt / (NP.sqrt(self.scales[:,None]))
return (cwt1* NP.conjugate(cwt1)).real
def getscales(self):
"""Return array of scales used in transform."""
return self.scales
def getnscale(self):
"""Return number of scales."""
return self.nscale
# wavelet classes
class Morlet(Cwt):
"""Complex Morlet wavelet."""
_omega0=5.0
fourierwl=4* NP.pi/(_omega0 + NP.sqrt(2.0 + _omega0**2))
def wf(self, s_omega):
H= NP.ones(len(s_omega))
# n=len(s_omega)
for i in range(len(s_omega)):
if s_omega[i] < 0.0: H[i]=0.0
# note : was s_omega/8 before 17/6/03
xhat=0.75112554*( NP.exp(-(s_omega-self._omega0)**2/2.0))*H
return xhat
class MorletReal(Cwt):
"""Real Morlet wavelet."""
_omega0=5.0
fourierwl=4* NP.pi/(_omega0+ NP.sqrt(2.0+_omega0**2))
def wf(self, s_omega):
H= NP.ones(len(s_omega))
# n=len(s_omega)
for i in range(len(s_omega)):
if s_omega[i] < 0.0: H[i]=0.0
# note : was s_omega/8 before 17/6/03
xhat=0.75112554*( NP.exp(-(s_omega-self._omega0)**2/2.0)+ NP.exp(-(s_omega+self._omega0)**2/2.0)- NP.exp(-(self._omega0)**2/2.0)+ NP.exp(-(self._omega0)**2/2.0))
return xhat
class Paul4(Cwt):
"""Paul m=4 wavelet."""
fourierwl=4* NP.pi/(2.*4+1.)
def wf(self, s_omega):
n=len(s_omega)
xhat= NP.zeros(n)
xhat[0:n//2]=0.11268723*s_omega[0:n//2]**4* NP.exp(-s_omega[0:n//2])
#return 0.11268723*s_omega**2*exp(-s_omega)*H
return xhat
class Paul2(Cwt):
"""Paul m=2 wavelet."""
fourierwl=4* NP.pi/(2.*2+1.)
def wf(self, s_omega):
n=len(s_omega)
xhat= NP.zeros(n)
xhat[0:n//2]=1.1547005*s_omega[0:n//2]**2* NP.exp(-s_omega[0:n//2])
#return 0.11268723*s_omega**2*exp(-s_omega)*H
return xhat
class Paul(Cwt):
"""Paul order m wavelet."""
def wf(self, s_omega):
Cwt.fourierwl=4* NP.pi/(2.*self.order+1.)
m=self.order
n=len(s_omega)
normfactor=float(m)
for i in range(1,2*m):
normfactor=normfactor*i
normfactor=2.0**m/ NP.sqrt(normfactor)
xhat= NP.zeros(n)
xhat[0:n//2]=normfactor*s_omega[0:n//2]**m* NP.exp(-s_omega[0:n//2])
#return 0.11268723*s_omega**2*exp(-s_omega)*H
return xhat
class MexicanHat(Cwt):
"""2nd Derivative Gaussian (Mexican hat) wavelet."""
fourierwl=2.0* NP.pi/ NP.sqrt(2.5)
def wf(self, s_omega):
# should this number be 1/sqrt(3/4) (no pi)?
#s_omega = s_omega/self.fourierwl
#print max(s_omega)
a=s_omega**2
b=s_omega**2/2
return a* NP.exp(-b)/1.1529702
#return s_omega**2*exp(-s_omega**2/2.0)/1.1529702
class DOG4(Cwt):
"""
4th Derivative Gaussian wavelet.
see also T&C errata for - sign
but reconstruction seems to work best with +!
"""
fourierwl=2.0* NP.pi/ NP.sqrt(4.5)
def wf(self, s_omega):
return s_omega**4* NP.exp(-s_omega**2/2.0)/3.4105319
class DOG1(Cwt):
"""
1st Derivative Gaussian wavelet.
but reconstruction seems to work best with +!
"""
fourierwl=2.0* NP.pi/ NP.sqrt(1.5)
def wf(self, s_omega):
dog1= NP.zeros(len(s_omega),NP.complex64)
dog1.imag=s_omega* NP.exp(-s_omega**2/2.0)/NP.sqrt(NP.pi)
return dog1
class DOG(Cwt):
"""
Derivative Gaussian wavelet of order m.
but reconstruction seems to work best with +!
"""
def wf(self, s_omega):
try:
from scipy.special import gamma
except ImportError:
print("Requires scipy gamma function")
raise ImportError
Cwt.fourierwl=2* NP.pi/ NP.sqrt(self.order+0.5)
m=self.order
dog=1.0J**m*s_omega**m* NP.exp(-s_omega**2/2)/ NP.sqrt(gamma(self.order+0.5))
return dog
class Haar(Cwt):
"""Continuous version of Haar wavelet."""
# note: not orthogonal!
# note: s_omega/4 matches Lecroix scale defn.
# s_omega/2 matches orthogonal Haar
# 2/8/05 constants adjusted to match artem eim
fourierwl=1.0#1.83129 #2.0
def wf(self, s_omega):
haar= NP.zeros(len(s_omega),NP.complex64)
om = s_omega[:]/self.currentscale
om[0]=1.0 #prevent divide error
#haar.imag=4.0*sin(s_omega/2)**2/om
haar.imag=4.0* NP.sin(s_omega/4)**2/om
return haar
class HaarW(Cwt):
"""Continuous version of Haar wavelet (norm)."""
# note: not orthogonal!
# note: s_omega/4 matches Lecroix scale defn.
# s_omega/2 matches orthogonal Haar
# normalised to unit power
fourierwl=1.83129*1.2 #2.0
def wf(self, s_omega):
haar= NP.zeros(len(s_omega),NP.complex64)
om = s_omega[:]#/self.currentscale
om[0]=1.0 #prevent divide error
#haar.imag=4.0*sin(s_omega/2)**2/om
haar.imag=4.0* NP.sin(s_omega/2)**2/om
return haar
if __name__=="__main__":
import numpy as np
import pylab as mpl
wavelet=Morlet
maxscale=4
notes=16
scaling="log" #or "linear"
#scaling="linear"
plotpower2d=True #False
# set up some data
Ns=1024
#limits of analysis
Nlo=0
Nhi=Ns
# sinusoids of two periods, 128 and 32.
x=np.arange(0.0,1.0*Ns,1.0)
A=np.sin(2.0*np.pi*x/128.0)
B=np.sin(2.0*np.pi*x/32.0)
A[512:768]+=B[0:256]
# Wavelet transform the data
cw=wavelet(A,largestscale=maxscale,notes=notes,scaling=scaling)
scales=cw.getscales()
cwt=cw.getdata()
# power spectrum
pwr=cw.getpower()
scalespec=np.sum(pwr,axis=1)/scales # calculate scale spectrum
# scales
y=cw.fourierwl*scales
x=np.arange(Nlo*1.0,Nhi*1.0,1.0)
fig=mpl.figure(1)
# 2-d coefficient plot
ax=mpl.axes([0.4,0.1,0.55,0.4])
mpl.xlabel('Time [s]')
plotcwt=np.clip(np.fabs(cwt.real), 0., 1000.)
if plotpower2d: plotcwt=pwr
im=mpl.imshow(plotcwt,cmap=mpl.cm.jet,extent=[x[0],x[-1],y[-1],y[0]],aspect='auto')
#colorbar()
if scaling=="log": ax.set_yscale('log')
mpl.ylim(y[0],y[-1])
ax.xaxis.set_ticks(np.arange(Nlo*1.0,(Nhi+1)*1.0,100.0))
ax.yaxis.set_ticklabels(["",""])
theposition=mpl.gca().get_position()
# data plot
ax2=mpl.axes([0.4,0.54,0.55,0.3])
mpl.ylabel('Data')
pos=ax.get_position()
mpl.plot(x,A,'b-')
mpl.xlim(Nlo*1.0,Nhi*1.0)
ax2.xaxis.set_ticklabels(["",""])
mpl.text(0.5,0.9,"Wavelet example with extra panes",
fontsize=14,bbox=dict(facecolor='green',alpha=0.2),
transform = fig.transFigure,horizontalalignment='center')
# projected power spectrum
ax3=mpl.axes([0.08,0.1,0.29,0.4])
mpl.xlabel('Power')
mpl.ylabel('Period [s]')
vara=1.0
if scaling=="log":
mpl.loglog(scalespec/vara+0.01,y,'b-')
else:
mpl.semilogx(scalespec/vara+0.01,y,'b-')
mpl.ylim(y[0],y[-1])
mpl.xlim(1000.0,0.01)
mpl.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment