Skip to content

Instantly share code, notes, and snippets.

@xmodar
Last active August 20, 2021 04:07
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xmodar/3cb2fd4e2618bbcf673282eff8c803f3 to your computer and use it in GitHub Desktop.
Save xmodar/3cb2fd4e2618bbcf673282eff8c803f3 to your computer and use it in GitHub Desktop.
Fitting Gaussian to Sampled Data Using PyTorch.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fitting Gaussian to Sampled Data Using PyTorch\n",
"\n",
"Author: [Modar Alfadly](https://github.com/ModarTensai/) (19th Septemper 2018)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import math\n",
"import torch\n",
"\n",
"\n",
"def gaussian(x, mean, std, pdf=True, cdf=True):\n",
" '''The PDF and CDF of the normal distribution.\n",
"\n",
" Args:\n",
" x: The point of evaluation (tensor).\n",
" mean: The mean of the distribution.\n",
" std: The standard deviation.\n",
" pdf: Whether to return the PDF.\n",
" cdf: Whether to return the CDF.\n",
"\n",
" Returns:\n",
" The the normal (PDF, CDF) at x.\n",
" '''\n",
" arg = math.sqrt(0.5) * (x - mean) / std\n",
" if pdf:\n",
" coef = 1.0 / (math.sqrt(2.0 * math.pi) * std)\n",
" rpdf = coef * torch.exp(-torch.pow(arg, 2.0))\n",
" if not cdf:\n",
" return rpdf\n",
" if cdf:\n",
" rcdf = 0.5 + 0.5 * torch.erf(arg)\n",
" if not pdf:\n",
" return rcdf\n",
" return rpdf, rcdf\n",
"\n",
"\n",
"def percentile(x, q, sort=True):\n",
" '''The percentile of samples of data.\n",
"\n",
" Args:\n",
" x: The 1D data tensor.\n",
" q: The percentage in [0, 100].\n",
" sort: Set to False only if x is sorted to save computation.\n",
"\n",
" Returns:\n",
" The `q`th percentile of the data.\n",
" '''\n",
" if sort:\n",
" x = x.sort()[0]\n",
" if not (0 <= q <= 100):\n",
" raise ValueError('`q` must be in between 0 and 100.')\n",
" value = (q / 100) * x.numel() - 1\n",
" if value < 0:\n",
" return 0\n",
" index = int(value)\n",
" frac = value - index\n",
" next_index = min(index + 1, x.numel() - 1)\n",
" return (1 - frac) * x[index] + frac * x[next_index]\n",
"\n",
"\n",
"def inter_quartile_range(x, sort=True):\n",
" '''IQR: the inter-quartile range of samples of data.\n",
"\n",
" Args:\n",
" x: The 1D data tensor.\n",
" sort: Set to False only if x is sorted to save computation.\n",
"\n",
" Returns:\n",
" The IQR of the data.\n",
" '''\n",
" if sort:\n",
" x = x.sort()[0]\n",
" return percentile(x, 75, False) - percentile(x, 25, False)\n",
"\n",
"\n",
"def num_hist_bins(x, min_bins=1, max_bins=1000, sort=True):\n",
" '''Freedman-Diaconis rule for the number of bins.\n",
"\n",
" Args:\n",
" x: The 1D data tensor.\n",
" sort: Set to False only if x is sorted to save computation.\n",
" min_bins: The minimum number of bins.\n",
" max_bins: The maximum number of bins.\n",
"\n",
" Returns:\n",
" The appropriate number of bins to histogram the given data.\n",
" '''\n",
" if sort:\n",
" x = x.sort()[0]\n",
" bin_width = 2 * x.numel()**(-1 / 3) * inter_quartile_range(x, False)\n",
" num_bins = ((x[-1] - x[0]) / bin_width).round().long()\n",
" l_bounded = torch.max(num_bins, torch.tensor(min_bins))\n",
" ul_bounded = torch.min(l_bounded, torch.tensor(max_bins))\n",
" return ul_bounded\n",
"\n",
"\n",
"def integrate_area(values, bounds=None, dx=1, _sum=None):\n",
" '''Integrate using the composite trapezoidal rule.\n",
"\n",
" Args:\n",
" values: A 1D tenosr of y-axis values.\n",
" bounds: A tuple (lower, upper) of the range of the x-axis.\n",
" dx: The distance between `values` if `bounds is None`.\n",
" _sum: The pre-computed sum of `values`.\n",
" \n",
" Returns:\n",
" The area under the curve using the compoite trapezoidal rule.\n",
" '''\n",
" if _sum is None:\n",
" _sum = values.sum()\n",
" if bounds is None:\n",
" return dx * (_sum - (values[0] + values[-1]) / 2)\n",
" else:\n",
" return _sum * (bounds[1] - bounds[0]) / (values.numel() - 1)\n",
"\n",
"\n",
"def normalized_histogram(x, bins=None, sort=True):\n",
" '''Estimates the PDF of samples of data as a histogram.\n",
" \n",
" In case that x was constant\n",
"\n",
" Args:\n",
" x: The 1D data tensor.\n",
" bins: The desired number of bins. If None, use num_hist_bins(x).\n",
" sort: Set to False only if x is sorted to save computation.\n",
"\n",
" Returns:\n",
" The normalized histogram of the data as 1D tensor and\n",
" the range bounds (lower, upper).\n",
" '''\n",
" if sort:\n",
" x = x.sort()[0]\n",
" if bins is None:\n",
" bins = num_hist_bins(x, sort=False).item()\n",
" hist = x.histc(bins)\n",
" bounds = (x[0], x[-1])\n",
" area = integrate_area(hist, bounds, _sum=x.numel())\n",
" if x[0] == x[-1]:\n",
" area = hist.clone()\n",
" area[area == 0] = 1\n",
" return hist / area, bounds\n",
"\n",
"\n",
"def hist_as_func(hist, bounds, filler=float('nan')):\n",
" '''Converts a discrete histogram into a continuous function.\n",
"\n",
" One can discretize a function f in a certain domain `bounds = [lower, upper]`\n",
" using a histogram hist of n bins, i.e. n functions values at uniformily\n",
" distributed samples in the range [lower, upper].\n",
" In other words, we can say that f(x) = hist[index]\n",
" where index = (x - lower) * (n - 1) / (upper - lower)\n",
" assuming that index is an integer in the range [lower, upper].\n",
" If index is not in the range, the default value is `filler`.\n",
" If index is not integer, we linearly interpolate the adjacent elements.\n",
"\n",
" Args:\n",
" hist: A sorted list of values that represent a histogram.\n",
" bounds: A tuple (lower, upper) of the range of the x-axis.\n",
" filler: The default value for out of range samples.\n",
"\n",
" Returns:\n",
" A function `func` that evaluates hist[x] at any given x.\n",
" If x < lower or x > upper, the default hist value will be `filler`.\n",
" This function will have all the parameters {hist, lower, upper, filler}\n",
" as attributes, e.g. `func.hist is hist` and so on.\n",
" '''\n",
" lower, upper = bounds\n",
" if upper == lower:\n",
" hist_sum = hist.sum()\n",
" else:\n",
" factor = (hist.size(0) - 1) / (upper - lower)\n",
" def func(x):\n",
" out = torch.empty_like(x)\n",
" in_bound = (lower <= x) * (x <= upper)\n",
" out[~in_bound] = filler\n",
" if upper == lower:\n",
" out[in_bound] = hist_sum\n",
" else:\n",
" pos = (x[in_bound] - lower) * factor\n",
" index, frac = pos.long(), pos % 1\n",
" next_index = (index + 1).clamp(max=hist.size(0) - 1)\n",
" out[in_bound] = (1 - frac) * hist[index] + frac * hist[next_index]\n",
" return out\n",
" func.hist = hist\n",
" func.bounds = bounds\n",
" func.filler = filler\n",
" return func\n",
" \n",
"\n",
"def estimate_pdf(x, sort=True):\n",
" '''Fit the PDF of samples of data.\n",
"\n",
" Args:\n",
" x: A 1D data tensor.\n",
" sort: Set to False only if x is sorted to save computation.\n",
"\n",
" Returns:\n",
" The PDF function (output of `hist_as_func()`).\n",
" '''\n",
" hist, bounds = normalized_histogram(x, sort=sort)\n",
" return hist_as_func(hist, bounds, filler=0)\n",
"\n",
"\n",
"def pdf_similarity(hist1, hist2, bounds):\n",
" '''Similarity between to PDFs using histogram kernel intersection.\n",
"\n",
" Args:\n",
" hist1: The first PDF as a normalized histogram (1D tensor).\n",
" hist2: The second histogram.\n",
" bounds: A tuple (lower, upper) range of the x-axis.\n",
"\n",
" Returns:\n",
" The similarity measure [0, 1].\n",
" '''\n",
" if hist1.numel() != hist2.numel():\n",
" raise ValueError('The two PDFs should have the same length.')\n",
" return integrate_area(torch.min(hist1, hist2), bounds)\n",
"\n",
"\n",
"def fit_normal(x, sigmas=5, correct=True, sort=True):\n",
" '''Estimate the PDF from samples and fit a Gaussian.\n",
"\n",
" Args:\n",
" x: A 1D data tensor.\n",
" sigmas: How many standard deviations away from the mean to consider.\n",
" correct: Whether to normalize `pdf_fit` to account for sampling errors.\n",
" sort: Set to False only if x is sorted to save computation.\n",
"\n",
" Returns:\n",
" {xs: A linspace for the fitted PDFs\n",
" pdf: The PDF of the data as 1D histogram\n",
" fit: The histogram of the best Guassian fit\n",
" mean: The mean of the Gaussian fit\n",
" std: The standard deviation of the Gaussian fit\n",
" similarity: The simialrity between `pdf` and `fit`}\n",
" '''\n",
" std = x.std()\n",
" mean = x.mean()\n",
" pdf = estimate_pdf(x, sort=sort)\n",
" lower = min(pdf.bounds[0], mean - sigmas * std)\n",
" upper = max(pdf.bounds[1], mean + sigmas * std)\n",
" bounds = (lower, upper)\n",
" xs = torch.linspace(*bounds, max(pdf.hist.size(0), 100),\n",
" device=x.device, dtype=x.dtype)\n",
" pdf_hist = pdf(xs) # normalize to correct sampling errors\n",
" if correct:\n",
" pdf_hist /= integrate_area(pdf_hist, bounds)\n",
" gaussian_fit = gaussian(xs, mean, std, cdf=False)\n",
" similarity = pdf_similarity(pdf_hist, gaussian_fit, bounds)\n",
" if lower == upper: # handle the degenerate case\n",
" pdf_hist[0] = gaussian_fit[0] = similarity = 1\n",
" \n",
" results = {\n",
" 'xs': xs,\n",
" 'pdf': pdf_hist,\n",
" 'fit': gaussian_fit,\n",
" 'mean': mean, \n",
" 'std': std,\n",
" 'similarity': similarity,\n",
" }\n",
" return results"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import matplotlib.pyplot as plt\n",
"\n",
"# plt.style.use('dark_background')\n",
"\n",
"def fit_normal_and_display(x):\n",
" fit = fit_normal(x)\n",
" label = 'PDF fits~{:.2f}%'.format(100 * fit['similarity'])\n",
" plt.plot(fit['xs'].numpy(), fit['pdf'].numpy(), 'c', label=label)\n",
" label = 'N({:.2f}, {:.2f}^2)'.format(fit['mean'], fit['std'])\n",
" plt.plot(fit['xs'].numpy(), fit['fit'].numpy(), 'g', label=label)\n",
" plt.legend()\n",
" plt.show()\n",
"\n",
"\n",
"n = 1000000\n",
"\n",
"x = torch.randn(n)\n",
"fit_normal_and_display(x)\n",
"\n",
"x = torch.linspace(0, 100, n) ** 0.2\n",
"fit_normal_and_display(x)\n",
"\n",
"x = torch.randn(n)/5 - 1000 + torch.rand(n)*3\n",
"fit_normal_and_display(x)\n",
"\n",
"x = torch.rand(n)*0 + 10 # a single point\n",
"fit_normal_and_display(x)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [default]",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment