Skip to content

Instantly share code, notes, and snippets.

@calum-chamberlain
Created March 1, 2017 01:01
Show Gist options
  • Save calum-chamberlain/c8123a671a91f7cfe1bacae9226087d5 to your computer and use it in GitHub Desktop.
Save calum-chamberlain/c8123a671a91f7cfe1bacae9226087d5 to your computer and use it in GitHub Desktop.
A re-write of match_filter internals to use script rather than openCV
from __future__ import division, print_function, absolute_import
import bottleneck
import threading
import numpy as np
import time
from scipy._lib._version import NumpyVersion
from scipy.signal.signaltools import _centered
from scipy import fftpack
from eqcorrscan.utils.timer import Timer
from eqcorrscan.core import match_filter
_rfft_mt_safe = (NumpyVersion(np.__version__) >= '1.9.0.dev-e24486e')
_rfft_lock = threading.Lock()
def multi_normxcorr(templates, search_window):
"""
:param templates:
:param search_window:
:return:
"""
search_window = search_window.astype(np.float32)
n = templates.shape[1]
slen = len(search_window)
fftshape = n + slen - 1
try:
n_templates = templates.shape[0]
except IndexError:
n_templates = 1
templates = np.array([templates])
# Initialize output array
results = np.array(
[np.empty(slen - n + 1, np.float32)] * n_templates)
# Set up normalizers
mean_array = bottleneck.move_mean(search_window, n)[n - 1:]
std_array = bottleneck.move_std(search_window, n)[n - 1:]
# Issue giving zero std, also doesn't match other algortithms.
fshape = [fftpack.helper.next_fast_len(int(fftshape))]
fslice = tuple([slice(0, fftshape)])
complex_result = (np.issubdtype(search_window.dtype, complex) or
np.issubdtype(templates.dtype, complex))
if not complex_result and (_rfft_mt_safe or _rfft_lock.acquire(False)):
try:
sp1 = np.fft.rfftn(search_window, fshape)
finally:
if not _rfft_mt_safe:
_rfft_lock.release()
else:
# Use the (threadsafe but slower) SciPy complex-FFT routines instead.
sp1 = fftpack.fftn(search_window, fshape)
for i in range(n_templates):
nt = (templates[i] - np.mean(templates[i])) / (np.std(templates[i]) * n)
nt = nt[::-1]
if not complex_result and (_rfft_mt_safe or _rfft_lock.acquie(False)):
try:
sp2 = np.fft.rfftn(nt, fshape)
ret = (np.fft.irfftn(sp1 * sp2, fshape)[fslice].copy())
finally:
if not _rfft_mt_safe:
_rfft_lock.release()
else:
sp2 = fftpack.fftn(nt, fshape)
ret = fftpack.ifftn(sp1 * sp2)[fslice].copy()
results[i] = (_centered(ret, slen - n + 1) - nt.sum() *
mean_array) / std_array
return results
def _channel_loop(templates, stream, cores=1, debug=0):
"""
Internal loop for parallel processing.
Loop to generate cross channel correlation sums for a series of templates
hands off the actual correlations to a sister function which can be run
in parallel.
:type templates: list
:param templates:
A list of templates, where each one should be an obspy.Stream object
containing multiple traces of seismic data and the relevant header
information.
:type stream: obspy.core.stream.Stream
:param stream:
A single Stream object to be correlated with the templates. This is
in effect the image in normxcorr2 and cv2.
:type cores: int
:param cores: Number of cores to loop over
:type debug: int
:param debug: Debug level.
:returns:
New list of :class:`numpy.ndarray` objects. These will contain
the correlation sums for each template for this day of data.
:rtype: list
:returns:
list of ints as number of channels used for each cross-correlation.
:rtype: list
:returns:
list of list of tuples of station, channel for all cross-correlations.
:rtype: list
.. Note::
Each template must contain the same channels as every other template,
the stream must also contain the same channels (note that if there
are duplicate channels in the template you do not need duplicate
channels in the stream).
"""
num_cores = cores
if len(templates) < num_cores:
num_cores = len(templates)
# Initialize cccs_matrix, which will be two arrays of len(templates) arrays
# where the arrays cccs_matrix[0[:]] will be the cross channel sum for each
# template.
# Note: This requires all templates to be the same length, and all channels
# to be the same length
temp_len = len(templates[0][0].data)
cccs_matrix = np.array([np.array([np.array([0.0] * (len(stream[0].data) -
temp_len + 1))] *
len(templates))] * 2, dtype=np.float16)
# Initialize number of channels array
no_chans = np.array([0] * len(templates))
chans = [[] for _ in range(len(templates))]
# Match-filter enforces that each template is the same length...
for stream_ind in range(len(templates[0])):
station = templates[0][stream_ind].stats.station
channel = templates[0][stream_ind].stats.channel
print('Running %s.%s' % (station, channel))
tr = stream.select(station=station, channel=channel)[0].data
template_array = np.array(
[template[stream_ind].data for template in templates])
chan_corrs = multi_normxcorr(
templates=template_array, search_window=tr)
for i, template in enumerate(templates):
delay = template[stream_ind].stats.starttime - \
template.sort(['starttime'])[0].stats.starttime
pad = np.array([0] * int(
round(delay * template[stream_ind].stats.sampling_rate)))
chan_corrs[i] = np.append(chan_corrs[i], pad)[len(pad):]
cccs_matrix[1] = chan_corrs
if debug >= 2:
print('cccs_matrix shaped: ' + str(np.shape(cccs_matrix)))
print('cccs_matrix is using ' + str(cccs_matrix.nbytes / 1000000) +
' MB of memory')
# Now we have an array of arrays with the first dimensional index
# giving the channel, the second dimensional index giving the
# template and the third dimensional index giving the position
# in the ccc, e.g.:
# np.shape(cccsums)=(len(stream), len(templates), len(ccc))
if debug >= 2:
print('cccs_matrix as a np.array is shaped: ' +
str(np.shape(cccs_matrix)))
# First work out how many channels were used
for i in range(0, len(templates)):
if not np.all(cccs_matrix[1][i] == 0):
# Check that there are some real numbers in the vector rather
# than being all 0, which is the default case for no match
# of image and template names
no_chans[i] += 1
chans[i].append((station, channel))
# Now sum along the channel axis for each template to give the
# cccsum values for each template for each day
with Timer() as t:
cccs_matrix[0] = cccs_matrix.sum(axis=0).astype(np.float16)
if debug >= 1:
print("--------- TIMER: Summing took %s s" % t.secs)
if debug >= 2:
print('cccs_matrix is shaped: ' + str(np.shape(cccs_matrix)))
return cccs_matrix[0], no_chans, chans
if __name__ == '__main__':
from obspy import read
import glob
templates = [read(t) for t in glob.glob('templates/*')]
print('Running %i templates' % len(templates))
st = read('data/*')
statchans = ['ANWZ.EHZ', 'PRHZ.EHZ', 'PXZ.EHZ']
for template in templates:
for tr in template:
if '.'.join([tr.stats.station, tr.stats.channel]) not in statchans:
template.remove(tr)
for tr in st:
if '.'.join([tr.stats.station, tr.stats.channel]) not in statchans:
st.remove(tr)
tic = time.time()
cccsums, no_chans, chans = _channel_loop(templates, st)
toc = time.time()
print('Scipy serial loop took %f' % (toc - tic))
tic = time.time()
eqcorr_cccsums, eqcorr_no_chans, eqcorr_chans = match_filter._channel_loop(
templates, st, cores=4)
toc = time.time()
print('EQcorrscan parallel loop took %f' % (toc - tic))
assert np.allclose(cccsums, eqcorr_cccsums, atol=0.0001)
@d-chambers
Copy link

d-chambers commented Mar 2, 2017

I just found some time to spend a few minutes with this.

I didn't realize before that you had based this function on scipy's fft convolve. The rlock and such make a lot more sense now.

I didn't like the template looping in lines 58 to 72 so I rewrote part of it to just do it all at once with numpy broadcasting. Also, I found that, on my machine, scipy.fftpack.fft is actually faster than np.fft.rfftn. scipy.fftpack.rfft should be even faster but I couldn't get it to work with the convolution (will need to dig into it more).

Here is what I have, it's a bit cleaner and should take the same inputs. I will try to do more testing and profiling on all the functions later this week.

def time_cross_correlate(a1, a2, time_axis= -1):
    """
    apply cross correlation to two ndarrays or DataArrays along last axis

    Parameters
    ----------
    a1
        The templates with the following dimensions:
        (id, channel, time)
    a2
        The search window with the same parameters as a1
    time_axis
        The axis number that represents the time dimension

    Returns
    ---------
    np.ndarray
        The cross correlation values for each channel and event
    """
    ta = time_axis
    if a1.shape[ta] > a2.shape[ta]:  # make sure template (a1) is shorter
        a1, a2 = a2, a1
    tem_len = a1.shape[ta]  # length of template
    # normalize the template
    numer = a1 - a1.mean(axis=ta, keepdims=True)
    denom = a1.std(axis=ta, keepdims=True) * tem_len
    norm_tem = numer / denom
    # get the sum of the search window, rolling mean, rolling std
    sum_norm_tem = norm_tem.sum(axis=ta, keepdims=True)
    r_mean = bottleneck.move_mean(a2, tem_len, axis=ta)[..., tem_len - 1:]
    r_std = bottleneck.move_std(a2, tem_len, axis=ta)[..., tem_len - 1:]
    # find the next fast sample, transform to freq. domain
    next_fast = scipy.fftpack.next_fast_len(a1.shape[-1] + a2.shape[-1] - 1)
    fft1 = scipy.fftpack.fft(np.flip(norm_tem, axis=ta), next_fast, axis=ta)
    fft2 = scipy.fftpack.fft(a2, next_fast, axis=ta)
    # multiply the freq. domain, take inverse
    ifft = np.real(scipy.fftpack.ifft(fft1 * fft2, next_fast, axis=ta))
    ifft = ifft[..., : a2.shape[-1] + a1.shape[-1] - 1]
    out = _centered(ifft, abs(a1.shape[-1] - a2.shape[-1]) + 1)
    result = (out - sum_norm_tem * r_mean) / r_std
    return result


def _centered(arr, new_len):
    # Return the center newshape portion of the array.
    startind = (arr.shape[-1] - new_len) // 2
    endind = startind + new_len
    return arr[..., startind: endind]

Edit: removed the type annotation in case you are running 2.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment