Skip to content

Instantly share code, notes, and snippets.

@mdhk
Created August 25, 2018 19:07
Show Gist options
  • Save mdhk/ad0725cf494385d699aef6d6c40131be to your computer and use it in GitHub Desktop.
Save mdhk/ad0725cf494385d699aef6d6c40131be to your computer and use it in GitHub Desktop.
Cross-correlation signal alignment
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Signal alignment using cross-correlation maximum"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# libraries for computations\n",
"from scipy import signal\n",
"import numpy as np\n",
"\n",
"# libraries for visualization & interaction\n",
"import matplotlib.pyplot as plt\n",
"% matplotlib inline\n",
"from ipywidgets import interact, interactive, fixed, interact_manual\n",
"import ipywidgets as widgets\n",
"from IPython.display import display"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def correlation_plots(b_offset):\n",
" \"\"\"\n",
" Plots one cosine in red, and another cosine in blue that is offsetted from the red one by \n",
" a value specificied through the given b_offset parameter. (Cosines from t=0 to t=5)\n",
" Returns the found lag for the maximum cross-correlation between the two cosines.\n",
" \"\"\"\n",
" \n",
" # define the standard values for plotting the cosines\n",
" timestep = 0.01\n",
" start = 0\n",
" end = 5\n",
" signal_size = abs(end-start)\n",
" \n",
" # the possible lag values are anything from -signal_size+timestep (smallest overlap on left side) \n",
" # to signal_size-timestep (smallest overlap on right side)\n",
" lag_values = np.arange(-signal_size+timestep, signal_size, timestep)\n",
"\n",
" # compute values for t and the two cosines\n",
" t = np.linspace(start, end, int(end/timestep))\n",
" a = np.cos(2*np.pi*t)\n",
" b = np.cos(2*np.pi*t+b_offset)\n",
"\n",
" # find the cross-correlation values and the index of the maximum cross-correlation\n",
" crosscorr = signal.correlate(a, b)\n",
" max_crosscorr_idx = np.argmax(crosscorr)\n",
"\n",
" # find the lag at the cross-correlation maximum (t-value) and the number of timesteps corresponding to this lag\n",
" lag = lag_values[max_crosscorr_idx]\n",
" lag_timesteps = int(round(lag_values[max_crosscorr_idx]/timestep))\n",
" \n",
" # compute new values for t, a & b for plotting\n",
" if lag > 0:\n",
" new_a = list(a) + [np.nan]*lag_timesteps\n",
" new_b = [np.nan]*lag_timesteps + list(b)\n",
" new_t = np.linspace(start, end+lag, int(round((end+lag)/timestep)))\n",
" else:\n",
" new_a = [np.nan]*abs(lag_timesteps) + list(a)\n",
" new_b = list(b) + [np.nan]*abs(lag_timesteps)\n",
" new_t = np.linspace(start+lag, end, int(round((end-(start+lag))/timestep)))\n",
"\n",
" # plot the original, unaligned signals\n",
" fig = plt.figure(figsize=(15,12))\n",
" ax1 = plt.subplot(311)\n",
" ax1.plot(t, a, 'r-', label='a', linewidth=5, alpha=0.5)\n",
" ax1.plot(t, b, 'b-', label='b', linewidth=5, alpha=0.5)\n",
" ax1.set_xlabel('t')\n",
" ax1.set_title('unaligned signals')\n",
" ax1.legend(loc='lower left')\n",
"\n",
" # plot the cross-correlation curve, with a vertical line through the maximum\n",
" ax2 = plt.subplot(312)\n",
" ax2.plot(lag_values, crosscorr, 'g-')\n",
" ax2.set_xlabel('lag values')\n",
" ax2.set_xlim(-end,end)\n",
" ax2.set_xticks(np.arange(-end, end))\n",
" ax2.set_ylabel('correlation')\n",
" ax2.axvline(x=lag_values[max_crosscorr_idx])\n",
" ax2.set_title('cross-correlation')\n",
"\n",
" # plot the aligned signals (b adjusted with found lag for the maximum cross-correlation)\n",
" ax3 = plt.subplot(313)\n",
" ax3.plot(new_t, new_a, 'r-', label='a', linewidth=5, alpha=0.5)\n",
" ax3.plot(new_t, new_b, 'b-', label='b', linewidth=5, alpha=0.5)\n",
" ax3.set_xlabel('t')\n",
" ax3.set_title('aligned signals')\n",
" ax3.legend(loc='lower left')\n",
"\n",
" # show the pretty plots\n",
" plt.tight_layout()\n",
" plt.show()\n",
" \n",
" # print and return the found lag\n",
" print('lag:', round(lag, 2))\n",
" return lag"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "92fd06175f0143e8aed9ff1dfb33a475",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"interactive(children=(FloatSlider(value=0.0, description='b_offset', max=6.283185307179586, min=-6.28318530717…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# interactive widget for playing with the offset and seeing the cross-correlation peak and aligned signals. \n",
"# slider minimum = -2 pi, maximum = 2 pi, step size = 1/4 pi\n",
"widget = interactive(correlation_plots, b_offset=(-2*np.pi, 2*np.pi, (1/4)*np.pi))\n",
"display(widget)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
ipywidgets==7.4.0
matplotlib==2.2.2
numpy==1.14.5
scipy==1.1.0
@mdhk
Copy link
Author

mdhk commented Aug 25, 2018

A little script + interactive widget for playing with time-alignment of two cosines with different delay values. The optimal lag value is determined by finding the maximum cross-correlation peak between the two signals.

animated

Binder link (for running the notebook & trying out the widget): https://mybinder.org/v2/gist/mdhk/ad0725cf494385d699aef6d6c40131be/master?filepath=align_signals_crosscorr.ipynb

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment