Skip to content

Instantly share code, notes, and snippets.

@mretegan
Created July 12, 2021 07:44
Show Gist options
  • Save mretegan/5ec039a216449f5a9bbf81854f7919a2 to your computer and use it in GitHub Desktop.
Save mretegan/5ec039a216449f5a9bbf81854f7919a2 to your computer and use it in GitHub Desktop.
Hampel filter
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "2d243c46",
"metadata": {},
"outputs": [],
"source": [
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "079b5629",
"metadata": {},
"outputs": [],
"source": [
"x = np.arange(0, 2 * np.pi, 2 * np.pi / 100)\n",
"y = np.sin(x)\n",
"y[15] = 120"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "a1b21e32",
"metadata": {},
"outputs": [],
"source": [
"def hampel(x, k, t0=3, exclude_crrt_point=True, nan_substitution=True):\n",
" '''Perform hampel filtering, returning both filtered series and\n",
" mask of filtered points.\n",
" Input:\n",
" x: 1-d numpy array of numbers to be filtered\n",
" k: number of items in (window-1)/2, i.e. using k elems before and after\n",
" in addition to the current point\n",
" t0: number of standard deviations to use; 3 is default\n",
" exclude_crrt_point: if the current point being inspected should be\n",
" ignored when calculating variance.\n",
" nan_substitution: if True then invalid points should be replaced by a\n",
" NaN value. If False, interpolate with the median value.\n",
" Output:\n",
" y: 1-d numpy array obtained by filtering (this is a np.copy of x),\n",
" NaN where discarded\n",
" mask_modified: boolean mask, True if point has been discarded or\n",
" modified by the filter\n",
" '''\n",
"\n",
" # NOTE: adapted from hampel function in R package pracma\n",
" # NOTE: this is adapted from: https://stackoverflow.com/questions/46819260/\n",
" # filtering-outliers-how-to-make-median-based-hampel-function-faster\n",
" # NOTE: adapted from the issue by Jean Rabault jean.rblt@gmail.com 09-2020\n",
"\n",
" if not isinstance(x, np.ndarray):\n",
" raise ValueError(\"x should be a numpy array\")\n",
"\n",
" if not len(np.squeeze(x).shape) == 1:\n",
" raise ValueError(\"x should be 1-dimensional\")\n",
"\n",
" if not isinstance(k, int):\n",
" raise ValueError(\"k should be an int\")\n",
"\n",
" if not isinstance(t0, int):\n",
" raise ValueError(\"t0 should be an int\")\n",
"\n",
" y = np.copy(x) # y is the corrected series\n",
" y = np.squeeze(y)\n",
"\n",
" mask_modified = np.zeros((y.shape[0]), dtype=bool)\n",
"\n",
" n = y.shape[0]\n",
" L = 1.4826\n",
"\n",
" # cannot apply the filter too close to the edges, as we need k points\n",
" # before and after\n",
" for i in range((k), (n - k)):\n",
"\n",
" # follow user preference on using or not the current point for\n",
" # estimating statistical properties\n",
" if exclude_crrt_point:\n",
" array_neighborhood = np.concatenate((\n",
" y[(i - k):(i)],\n",
" y[(i + 1):(i + k + 1)]\n",
" ))\n",
" else:\n",
" array_neighborhood = y[i - k: i + k + 1]\n",
"\n",
" # if all points around are already nans, cannot trust local point\n",
" if np.isnan(array_neighborhood).all():\n",
" if not np.isnan(y[i]):\n",
" y[i] = np.nan\n",
" mask_modified[i] = True\n",
" continue\n",
"\n",
" # if current point is already a nan, keep it so\n",
" if np.isnan(y[i]):\n",
" continue\n",
"\n",
" # otherwise, should perform the filtering\n",
" x0 = np.nanmedian(array_neighborhood)\n",
" S0 = L * np.nanmedian(np.abs(array_neighborhood - x0))\n",
"\n",
" if (np.abs(y[i] - x0) > t0 * S0):\n",
" if nan_substitution:\n",
" y[i] = np.nan\n",
" else:\n",
" y[i] = x0\n",
" mask_modified[i] = True\n",
"\n",
" return (y, mask_modified)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "927ce7e4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"10.8 ms ± 284 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"hampel(y, 5, 3)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "a0d4dd20",
"metadata": {},
"outputs": [],
"source": [
"from numpy.lib.stride_tricks import sliding_window_view\n",
"\n",
"def hampel(data, window_size=None, threshold=3.5, axis=0):\n",
" \"\"\"Outliers detection using the Hampel filter.\n",
"\n",
" More details about the filter can be found here:\n",
"\n",
" - https://fr.mathworks.com/help/dsp/ref/hampelfilter.html\n",
" - https://dsp.stackexchange.com/questions/26552\n",
" - https://stackoverflow.com/questions/22354094\n",
"\n",
" Parameters\n",
" ----------\n",
" data : numpy.array\n",
" Input data.\n",
" window_size : Union[None,int]\n",
" Size of the sliding window.\n",
" threshold : int\n",
" Threshold for outlier detection expressed in number of standard\n",
" deviations. Iglewicz and Hoaglin [1]_ suggest using a value of 3.5, but\n",
" larger values are often needed to avoid removing data points from a\n",
" noisy signal.\n",
" axis : int\n",
" Axis along which the detection is performed.\n",
"\n",
" Returns\n",
" -------\n",
" numpy.ndarray, numpy.array\n",
" Mask identifying the outliers, and rolling window median.\n",
"\n",
" References\n",
" ----------\n",
" .. [1] Boris Iglewicz and David Hoaglin (1993) \"Volume 16: How to Detect\n",
" and Handle Outliers\", The ASQC Basic References in Quality Control:\n",
" Statistical Techniques, Edward F. Mykytka, Ph.D., Editor.\n",
" \"\"\"\n",
" if window_size is None:\n",
" SECTIONS = 10\n",
" window_size = data.shape[axis] // SECTIONS\n",
" # Make the size of the window an odd number.\n",
" if window_size % 2 == 0:\n",
" window_size += 1\n",
" LOWER_LIMIT = 3\n",
" window_size = max(window_size, LOWER_LIMIT)\n",
"\n",
" # Pad the data along the direction used for the outlier detection.\n",
" pad_size = np.zeros((len(data.shape), 2), dtype=int)\n",
" pad_size[axis] = np.array((window_size // 2, window_size // 2))\n",
" padded_data = np.pad(data, pad_size, mode=\"constant\")\n",
"\n",
" # Create a sliding window view of the padded data.\n",
" padded_data_views = sliding_window_view(padded_data, window_size, axis)\n",
"\n",
" median = np.median(padded_data_views, axis=-1)\n",
" abs_diff = np.abs(data - median)\n",
"\n",
" mask = np.full(data.shape, False, dtype=bool)\n",
"\n",
" # Put the median in a suitable shape.\n",
" median_views = np.repeat(median, window_size).reshape(padded_data_views.shape)\n",
"\n",
" # Calculate the median absolute deviation for each view.\n",
" mad = np.median(np.abs(padded_data_views - median_views), axis=-1)\n",
"\n",
" # Calculate the standard deviation. We assume that the data is normally distributed.\n",
" # https://en.wikipedia.org/wiki/Median_absolute_deviation\n",
" k = 1.4826\n",
" sigma = k * mad\n",
"\n",
" score = threshold * sigma\n",
" mask[(abs_diff > score) & (sigma != 0)] = True\n",
"\n",
" return mask, median"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "c3c48daf",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"256 µs ± 14 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"hampel(y, 11, 3)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment