Skip to content

Instantly share code, notes, and snippets.

@Astro-Lee
Forked from cgobat/powerlaws.ipynb
Created June 3, 2024 07:47
Show Gist options
  • Save Astro-Lee/4f867c48d400a3d6bf2e5c529b81b520 to your computer and use it in GitHub Desktop.
Save Astro-Lee/4f867c48d400a3d6bf2e5c529b81b520 to your computer and use it in GitHub Desktop.
Overview of broken power law functions and fitting them to data
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Fitting power laws\n",
"\n",
"**by Caden Gobat<sup>1</sup>**\n",
"\n",
"<sup>1</sup> Department of Physics, the George Washington University, 725 21st Street NW, Washington, DC 20052, USA"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np, pandas as pd, matplotlib.pyplot as plt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Broken power law functions\n",
"\n",
"A broken power law is a piecewise function given by a sequence of conjoined power laws ($x^\\alpha$) where each section has its own power (index) and is defined by bounding \"breaks\".\n",
"\n",
"Mathematically, the function looks something like\n",
"$f(x) \\propto \\begin{cases} x^{\\alpha_0} & \\mathrm{if}\\ x \\leq b_1 \\\\\n",
"x^{\\alpha_1} & \\mathrm{if}\\ b_1 \\leq x \\leq b_2 \\\\\n",
"... \\\\\n",
"x^{\\alpha_n} & \\mathrm{if}\\ b_n \\leq x \\leq b_{n+1} \\\\\n",
"... \\end{cases}$, where $b_n$ is the $x$-location of the $n$<sup>th</sup> break point.\n",
"\n",
"Alternatively, a single-break power law can be described by a single algebraic expression using a `smoothness' parameter, $\\Delta$, that governs the behavior of the transition between the two slopes:\n",
"\n",
"$f(x) \\propto \\left(\\frac{x}{x_b}\\right)^{-\\alpha_1}\\left\\{\\frac{1}{2}\\left[1+\\left(\\frac{x}{x_b}\\right)^{1/\\Delta}\\right]\\right\\}^{(\\alpha_1-\\alpha_2)\\Delta}$, where $x_b$ is the break location and $\\alpha_1$ and $\\alpha_2$ are the power law indices below and above the break, respectively.\n",
"\n",
"The function below is a Python implementation of a generalizable broken power law and takes as input the independent variable, an arbitrarily-long list of break locations, and indices for each section."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def bkn_pow(xvals,breaks,alphas):\n",
" try:\n",
" if len(breaks) != len(alphas) - 1:\n",
" raise ValueError(\"Dimensional mismatch. There should be one more alpha than there are breaks.\")\n",
" except TypeError:\n",
" raise TypeError(\"Breaks and alphas should be array-like.\")\n",
" if any(breaks < np.min(xvals)) or any(breaks > np.max(xvals)):\n",
" raise ValueError(\"One or more break points fall outside given x bounds.\")\n",
" \n",
" breakpoints = [np.min(xvals)] + breaks + [np.max(xvals)] # create a list of all the bounding x-values\n",
" chunks = [np.array([x for x in xvals if x >= breakpoints[i] and x <= breakpoints[i+1]]) for i in range(len(breakpoints)-1)]\n",
" \n",
" all_y = []\n",
" \n",
" #alpha = pd.cut(pd.Series(xvals),breakpoints,labels=alphas,include_lowest=True).to_numpy()\n",
"\n",
" for idx,xchunk in enumerate(chunks):\n",
" yvals = xchunk**alphas[idx]\n",
" all_y.append(yvals) # add this piece to the output\n",
" \n",
" for i in range(1,len(all_y)):\n",
" all_y[i] *= np.abs(all_y[i-1][-1]/all_y[i][0]) # scale the beginning of each piece to the end of the last so it is continuous\n",
" \n",
" return(np.array([y for ychunk in all_y for y in ychunk])) # return flattened list\n",
"\n",
"def bkn_pow_smooth(x, A, x_b, a_1, a_2, delta=1):\n",
" a_1 *= -1\n",
" a_2 *= -1\n",
" return A*(x/x_b)**(-a_1) * (0.5*(1+(x/x_b)**(1/delta)))**((a_1-a_2)*delta)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"xbounds = (8,18) # log of x bounds\n",
"num_points = 200\n",
"x = np.logspace(*xbounds,num_points)\n",
"breaks = [7e9,1e11,5e12]\n",
"alphas = [2,1/3,-1/2,-1]\n",
"\n",
"y = bkn_pow(x,breaks,alphas)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"noisy = y * np.random.normal(1,0.1,len(y)) # add Gaussian noise to the data\n",
"labels = iter([r'$\\nu_a$',r'$\\nu_m$',r'$\\nu_c$'])\n",
"colors = iter('rgb')\n",
"\n",
"plt.loglog(x,noisy/1e16,marker='.',linestyle=\"\",color='grey')\n",
"plt.loglog(x,y/1e16,'k--',label=\"Absolute break model\")\n",
"plt.loglog(x,bkn_pow_smooth(x,2.2e4,4e10,2,-1,delta=1.1),label=\"Smoothly broken\")\n",
"\n",
"for br in breaks:\n",
" plt.axvline(br,linestyle=\"--\",lw=1,color=next(colors),label=next(labels))\n",
"\n",
"plt.legend()\n",
"plt.gca().tick_params(axis='both',which='both',direction='in',top=True,right=True)\n",
"plt.xlabel(r\"$\\nu$ (Hz)\")\n",
"plt.ylabel(r\"Flux ($\\mu$Jy)\")\n",
"plt.title(\"Demonstration of broken power law function\")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Fitting a broken power law\n",
"If we have data that we suspect might be described by a broken PL, the first step is to identify where the function breaks. Then, each piece of the data can be fit with its own index, $\\alpha$.\n",
"\n",
"To find the breaks in a dataset that we didn't construct, **we can look for spikes in the second derivative**.\n",
"\n",
"### Central finite difference method for numerical differentiation\n",
"\n",
"Finite differences can be used to get a [numerical approximation of a function's derivative](https://en.wikipedia.org/wiki/Finite_differences#Relation_with_derivatives) where the analytical derivative is unattainable:\n",
"\n",
"\\begin{equation}\n",
"\\frac{\\delta_h \\lbrack f \\rbrack (x)}{h} - f^\\prime(x) = O(h^2)\n",
"\\end{equation}\n",
"\n",
"![finitediff.png](https://images.deepai.org/glossary-terms/3b64fd4ae00945c28f9bb777ed60b0a8/finitediff.png)\n",
"\n",
"A similar thing can be done for second-order derivatives:\n",
"\\begin{equation}\n",
"f^{\\prime\\prime}(x) \\approx \\frac{\\delta_h^2 \\lbrack f\\rbrack (x)}{h^2} = \\frac{f(x+h)-2f(x)+f(x-h)}{h^2}\n",
"\\end{equation}\n",
"\n",
"When the function is unknown and the data is in the form of $(x_i,y_i)$ coordinate pairs, we can use the following as an approximation for the second derivative:\n",
"\n",
"\\begin{equation}\n",
"f^{\\prime\\prime}(x_i) \\approx \\frac{y_{i+1} - 2y_i + y_{i-1}}{\\left(\\frac{x_{i+1}-x_{i-1}}{2}\\right)^2}\n",
"\\end{equation}"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"def fin_diff_deriv(x,y,order,num_breaks=3):\n",
" from scipy.optimize import curve_fit\n",
" \n",
" logx, logy = np.log10(x), np.log10(y)\n",
" \n",
" first_derivative = []\n",
" second_derivative = []\n",
" \n",
" for idx in range(1,len(logx)-1):\n",
" first_derivative.append((logy[idx+1] - logy[idx-1])/(logx[idx+1]-logx[idx-1]))\n",
" second_derivative.append((logy[idx+1] - 2*logy[idx] + logy[idx-1])/((logx[idx+1]-logx[idx-1])/2)**2)\n",
" \n",
" return [logy,first_derivative,second_derivative][order] # placeholder for debugging: just return numerical derivative rather than anything real, for the moment"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Signal processing and smoothing\n",
"\n",
"Noisy and non-smooth data present a big problem for numerical differentiation techniques. [Chartrand (2011)](http://downloads.hindawi.com/archive/2011/164564.pdf) presents some signal-processing-type solutions. A naïve quick-fix would be to use a rolling mean to smooth the sample. We can also implement a low-pass filter to try to remove some of the noise while preserving the shape of the data."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"from scipy.signal import savgol_filter, fftconvolve\n",
"\n",
"smoothed = savgol_filter(noisy, 15, 5) # see § 'Savitzky-Golay filtering' below\n",
"smoothed = savgol_filter(smoothed,11,3) # second pass\n",
"\n",
"plt.loglog(x,noisy)\n",
"plt.loglog(x,smoothed)\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"labels = iter([r'$\\nu_a$',r'$\\nu_m$',r'$\\nu_c$'])\n",
"colors = iter('rgb')\n",
"\n",
"plt.plot(np.log10(x[1:-1]),fit_bkn_pow(x,noisy,1),\".\")\n",
"plt.plot(np.log10(x[1:-1]),fit_bkn_pow(x,y,1),\"--\")\n",
"plt.plot(np.log10(x[1:-1]),fit_bkn_pow(x,smoothed,1))\n",
"#plt.plot(np.log10(x),np.log10(savgol_filter(noisy,25,4,deriv=1)))\n",
"\n",
"for br in breaks:\n",
" plt.axvline(np.log10(br),linestyle=\"--\",lw=1,color=next(colors),label=next(labels))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Savitzky-Golay filtering\n",
"\n",
"Savitzky-Golay is a type of low-pass filter, particularly suited for smoothing data with high-frequency noise. The main idea behind this approach is to make for each point a least-square fit with a polynomial of high order over a odd-sized window centered at the point. It has the advantage of preserving the original shape and features of the signal better than other types of filtering approaches, such as moving average techniques.\n",
"\n",
"There is actually already a SciPy function that implements this filter: [`scipy.signal.savgol_filter`](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.savgol_filter.html) (as already seen above). I've rewritten it below because it's not that complicated and it's worth seeing how it works.\n",
"\n",
"As input, it takes the noisy data, the filter window size (odd positive integer), the polynomial order to use when filtering, and the order of the derivative to compute (default $0$ will denoise original function only). It outputs smoothed $y$-points.\n",
"\n",
"<u>Example:</u>\n",
"\n",
"```python\n",
"t = np.linspace(-4, 4, 500)\n",
"y = np.exp( -t**2 ) + np.random.normal(0, 0.05, t.shape)\n",
"ysg = savitzky_golay(y, window_size=31, order=4)\n",
"plt.plot(t, y, label='Noisy signal')\n",
"plt.plot(t, np.exp(-t**2), 'k', lw=1.5, label='Original signal')\n",
"plt.plot(t, ysg, 'r', label='Filtered signal')\n",
"plt.legend()\n",
"plt.show()\n",
"```\n",
"\n",
"For more, see [Savitzky & Golay (1964)](https://doi.org/10.1021/ac60214a047) and of course, [Numerical Recipes](http://www.cambridge.org/9780521880688)."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def savitzky_golay(y, window_size, order, deriv=0, rate=1):\n",
"\n",
" try:\n",
" window_size = np.abs(np.int(window_size))\n",
" order = np.abs(np.int(order))\n",
" except ValueError:\n",
" raise ValueError(\"window_size and order should be integers\")\n",
" if window_size % 2 != 1 or window_size < 1:\n",
" raise TypeError(\"window_size size must be positive and odd\")\n",
" if window_size < order + 2:\n",
" raise TypeError(\"window_size is too small for the polynomial's order\")\n",
" \n",
" half_window = (window_size-1) // 2\n",
" \n",
" # precompute coefficients\n",
" b = np.mat([[k**i for i in range(order+1)] for k in range(-half_window, half_window+1)])\n",
" m = np.linalg.pinv(b).A[deriv] * rate**deriv * np.math.factorial(deriv)\n",
" \n",
" # pad the signal at the extremes with values taken from the signal itself\n",
" firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] )\n",
" lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1])\n",
" y = np.concatenate((firstvals, y, lastvals))\n",
" \n",
" return np.convolve( m[::-1], y, mode='valid')"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"def sgolay2d(z, window_size, order, derivative=None):\n",
" \n",
" # number of terms in the polynomial expression\n",
" n_terms = ( order + 1 ) * ( order + 2) / 2.0\n",
"\n",
" if window_size % 2 == 0:\n",
" raise ValueError('window_size must be odd')\n",
"\n",
" if window_size**2 < n_terms:\n",
" raise ValueError('order is too high for the window size')\n",
"\n",
" half_size = window_size // 2\n",
"\n",
" # exponents of the polynomial. \n",
" # p(x,y) = a0 + a1*x + a2*y + a3*x^2 + a4*y^2 + a5*x*y + ... \n",
" # this line gives a list of two item tuple. Each tuple contains \n",
" # the exponents of the k-th term. First element of tuple is for x\n",
" # second element for y.\n",
" # Ex. exps = [(0,0), (1,0), (0,1), (2,0), (1,1), (0,2), ...]\n",
" exps = [ (k-n, n) for k in range(order+1) for n in range(k+1) ]\n",
"\n",
" # coordinates of points\n",
" ind = np.arange(-half_size, half_size+1, dtype=np.float64)\n",
" dx = np.repeat( ind, window_size )\n",
" dy = np.tile( ind, [window_size, 1]).reshape(window_size**2, )\n",
"\n",
" # build matrix of system of equation\n",
" A = np.empty( (window_size**2, len(exps)) )\n",
" for i, exp in enumerate( exps ):\n",
" A[:,i] = (dx**exp[0]) * (dy**exp[1])\n",
"\n",
" # pad input array with appropriate values at the four borders\n",
" new_shape = z.shape[0] + 2*half_size, z.shape[1] + 2*half_size\n",
" Z = np.zeros( (new_shape) )\n",
" # top band\n",
" band = z[0, :]\n",
" Z[:half_size, half_size:-half_size] = band - np.abs( np.flipud( z[1:half_size+1, :] ) - band )\n",
" # bottom band\n",
" band = z[-1, :]\n",
" Z[-half_size:, half_size:-half_size] = band + np.abs( np.flipud( z[-half_size-1:-1, :] ) -band )\n",
" # left band\n",
" band = np.tile( z[:,0].reshape(-1,1), [1,half_size])\n",
" Z[half_size:-half_size, :half_size] = band - np.abs( np.fliplr( z[:, 1:half_size+1] ) - band )\n",
" # right band\n",
" band = np.tile( z[:,-1].reshape(-1,1), [1,half_size] )\n",
" Z[half_size:-half_size, -half_size:] = band + np.abs( np.fliplr( z[:, -half_size-1:-1] ) - band )\n",
" # central band\n",
" Z[half_size:-half_size, half_size:-half_size] = z\n",
"\n",
" # top left corner\n",
" band = z[0,0]\n",
" Z[:half_size,:half_size] = band - np.abs( np.flipud(np.fliplr(z[1:half_size+1,1:half_size+1]) ) - band )\n",
" # bottom right corner\n",
" band = z[-1,-1]\n",
" Z[-half_size:,-half_size:] = band + np.abs( np.flipud(np.fliplr(z[-half_size-1:-1,-half_size-1:-1]) ) - band )\n",
"\n",
" # top right corner\n",
" band = Z[half_size,-half_size:]\n",
" Z[:half_size,-half_size:] = band - np.abs( np.flipud(Z[half_size+1:2*half_size+1,-half_size:]) - band )\n",
" # bottom left corner\n",
" band = Z[-half_size:,half_size].reshape(-1,1)\n",
" Z[-half_size:,:half_size] = band - np.abs( np.fliplr(Z[-half_size:, half_size+1:2*half_size+1]) - band )\n",
"\n",
" # solve system and convolve\n",
" if derivative == None:\n",
" m = np.linalg.pinv(A)[0].reshape((window_size, -1))\n",
" return fftconvolve(Z, m, mode='valid')\n",
" elif derivative == 'col':\n",
" c = np.linalg.pinv(A)[1].reshape((window_size, -1))\n",
" return fftconvolve(Z, -c, mode='valid')\n",
" elif derivative == 'row':\n",
" r = np.linalg.pinv(A)[2].reshape((window_size, -1))\n",
" return fftconvolve(Z, -r, mode='valid')\n",
" elif derivative == 'both':\n",
" c = np.linalg.pinv(A)[1].reshape((window_size, -1))\n",
" r = np.linalg.pinv(A)[2].reshape((window_size, -1))\n",
" return fftconvolve(Z, -r, mode='valid'), fftconvolve(Z, -c, mode='valid')"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 864x288 with 3 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# sample 2D data\n",
"x = np.linspace(-3,3,100)\n",
"y = np.linspace(-3,3,100)\n",
"X, Y = np.meshgrid(x,y)\n",
"\n",
"Z = np.exp( -(X**2+Y**2)) # pure function\n",
"Zn = Z + np.random.normal( 0, 0.2, Z.shape ) # added noise\n",
"Zf = sgolay2d( Zn, window_size=29, order=4) # re-filtered\n",
"\n",
"fig = plt.figure(figsize=(12,4))\n",
"axs = [fig.add_subplot(arr) for arr in [131,132,133]]\n",
"\n",
"for ax,mat in zip(axs,[Z,Zn,Zf]):\n",
" ax.matshow(mat)\n",
" ax.tick_params(axis='both',which='both',bottom=False,top=False,left=False,labeltop=False,labelleft=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.13 64-bit (windows store)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
},
"varInspector": {
"cols": {
"lenName": 16,
"lenType": 16,
"lenVar": 40
},
"kernels_config": {
"python": {
"delete_cmd_postfix": "",
"delete_cmd_prefix": "del ",
"library": "var_list.py",
"varRefreshCmd": "print(var_dic_list())"
},
"r": {
"delete_cmd_postfix": ") ",
"delete_cmd_prefix": "rm(",
"library": "var_list.r",
"varRefreshCmd": "cat(var_dic_list()) "
}
},
"types_to_exclude": [
"module",
"function",
"builtin_function_or_method",
"instance",
"_Feature"
],
"window_display": false
},
"vscode": {
"interpreter": {
"hash": "ab71228b3e09297cc4e0c1b6adea629ded25da184e6754fb14ae42947e6aab48"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment