Skip to content

Instantly share code, notes, and snippets.

@j-faria
Created February 11, 2019 22:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save j-faria/32c752dc90dae928a1641b59a60f4eaa to your computer and use it in GitHub Desktop.
Save j-faria/32c752dc90dae928a1641b59a60f4eaa to your computer and use it in GitHub Desktop.
Nightly bin a dataset of radial velocities
import numpy as np
###############################################################################
# the following is mostly a copy of the scipy implementation of
# binned_statistic and binned_statistic_dd
# but allowing for a weights parameter
from scipy._lib.six import callable, xrange
from scipy._lib._numpy_compat import suppress_warnings
## careful here!
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
def binned_statistic(x, values, statistic='mean', bins=10, range=None,
weights=None):
try:
N = len(bins)
except TypeError:
N = 1
if N != 1:
bins = [np.asarray(bins, float)]
if range is not None:
if len(range) == 2:
range = [range]
medians, edges, binnumbers = binned_statistic_dd(
[x], values, statistic, bins, range, weights)
return medians, edges[0], binnumbers
def binned_statistic_dd(sample, values, statistic='mean', bins=10, range=None,
weights=None, expand_binnumbers=False):
known_stats = ['mean', 'median', 'count', 'sum', 'std', 'min', 'max']
if not callable(statistic) and statistic not in known_stats:
raise ValueError('invalid statistic %r' % (statistic, ))
# `Ndim` is the number of dimensions (e.g. `2` for `binned_statistic_2d`)
# `Dlen` is the length of elements along each dimension.
# This code is based on np.histogramdd
try:
# `sample` is an ND-array.
Dlen, Ndim = sample.shape
except (AttributeError, ValueError):
# `sample` is a sequence of 1D arrays.
sample = np.atleast_2d(sample).T
Dlen, Ndim = sample.shape
# Store initial shape of `values` to preserve it in the output
values = np.asarray(values)
input_shape = list(values.shape)
# Make sure that `values` is 2D to iterate over rows
values = np.atleast_2d(values)
if weights is not None:
weights = np.atleast_2d(weights)
Vdim, Vlen = values.shape
# Make sure `values` match `sample`
if (statistic != 'count' and Vlen != Dlen):
raise AttributeError('The number of `values` elements must match the '
'length of each `sample` dimension.')
nbin = np.empty(Ndim, int) # Number of bins in each dimension
edges = Ndim * [None] # Bin edges for each dim (will be 2D array)
dedges = Ndim * [None] # Spacing between edges (will be 2D array)
try:
M = len(bins)
if M != Ndim:
raise AttributeError('The dimension of bins must be equal '
'to the dimension of the sample x.')
except TypeError:
bins = Ndim * [bins]
# Select range for each dimension
# Used only if number of bins is given.
if range is None:
smin = np.atleast_1d(np.array(sample.min(axis=0), float))
smax = np.atleast_1d(np.array(sample.max(axis=0), float))
else:
smin = np.zeros(Ndim)
smax = np.zeros(Ndim)
for i in xrange(Ndim):
smin[i], smax[i] = range[i]
# Make sure the bins have a finite width.
for i in xrange(len(smin)):
if smin[i] == smax[i]:
smin[i] = smin[i] - .5
smax[i] = smax[i] + .5
# Create edge arrays
for i in xrange(Ndim):
if np.isscalar(bins[i]):
nbin[i] = bins[i] + 2 # +2 for outlier bins
edges[i] = np.linspace(smin[i], smax[i], nbin[i] - 1)
else:
edges[i] = np.asarray(bins[i], float)
nbin[i] = len(edges[i]) + 1 # +1 for outlier bins
dedges[i] = np.diff(edges[i])
nbin = np.asarray(nbin)
# Compute the bin number each sample falls into, in each dimension
sampBin = [np.digitize(sample[:, i], edges[i]) for i in xrange(Ndim)]
# Using `digitize`, values that fall on an edge are put in the right bin.
# For the rightmost bin, we want values equal to the right
# edge to be counted in the last bin, and not as an outlier.
for i in xrange(Ndim):
# Find the rounding precision
decimal = int(-np.log10(dedges[i].min())) + 6
# Find which points are on the rightmost edge.
on_edge = np.where(
np.around(sample[:, i], decimal) == np.around(
edges[i][-1], decimal))[0]
# Shift these points one bin to the left.
sampBin[i][on_edge] -= 1
# Compute the sample indices in the flattened statistic matrix.
binnumbers = np.ravel_multi_index(sampBin, nbin)
result = np.empty([Vdim, nbin.prod()], float)
if statistic == 'mean':
result.fill(np.nan)
flatcount = np.bincount(binnumbers, None)
a = flatcount.nonzero()
for vv in xrange(Vdim):
flatsum = np.bincount(binnumbers, values[vv])
result[vv, a] = flatsum[a] / flatcount[a]
elif statistic == 'std':
result.fill(0)
flatcount = np.bincount(binnumbers, None)
a = flatcount.nonzero()
for vv in xrange(Vdim):
flatsum = np.bincount(binnumbers, values[vv])
flatsum2 = np.bincount(binnumbers, values[vv]**2)
result[vv, a] = np.sqrt(flatsum2[a] / flatcount[a] -
(flatsum[a] / flatcount[a])**2)
elif statistic == 'count':
result.fill(0)
flatcount = np.bincount(binnumbers, None)
a = np.arange(len(flatcount))
result[:, a] = flatcount[np.newaxis, :]
elif statistic == 'sum':
result.fill(0)
for vv in xrange(Vdim):
flatsum = np.bincount(binnumbers, values[vv])
a = np.arange(len(flatsum))
result[vv, a] = flatsum
elif statistic == 'median':
result.fill(np.nan)
for i in np.unique(binnumbers):
for vv in xrange(Vdim):
result[vv, i] = np.median(values[vv, binnumbers == i])
elif statistic == 'min':
result.fill(np.nan)
for i in np.unique(binnumbers):
for vv in xrange(Vdim):
result[vv, i] = np.min(values[vv, binnumbers == i])
elif statistic == 'max':
result.fill(np.nan)
for i in np.unique(binnumbers):
for vv in xrange(Vdim):
result[vv, i] = np.max(values[vv, binnumbers == i])
elif callable(statistic):
with np.errstate(invalid='ignore'), suppress_warnings() as sup:
sup.filter(RuntimeWarning)
try:
null = statistic([], weights=[])
except:
null = np.nan
result.fill(null)
for i in np.unique(binnumbers):
for vv in xrange(Vdim):
if weights is None:
result[vv, i] = statistic(values[vv, binnumbers == i])
else:
result[vv, i] = statistic(values[vv, binnumbers == i],
weights[vv, binnumbers == i])
# Shape into a proper matrix
result = result.reshape(np.append(Vdim, nbin))
# Remove outliers (indices 0 and -1 for each bin-dimension).
core = [slice(None)] + Ndim * [slice(1, -1)]
result = result[core]
# Unravel binnumbers into an ndarray, each row the bins for each dimension
if (expand_binnumbers and Ndim > 1):
binnumbers = np.asarray(np.unravel_index(binnumbers, nbin))
if np.any(result.shape[1:] != nbin - 2):
raise RuntimeError('Internal Shape Error')
# Reshape to have output (`reulst`) match input (`values`) shape
result = result.reshape(input_shape[:-1] + list(nbin - 2))
return result, edges, binnumbers
# put back the documentation
from scipy.stats import binned_statistic as old_binned_statistic,\
binned_statistic_dd as old_binned_statistic_dd
doc1 = old_binned_statistic.__doc__
doc2 = old_binned_statistic_dd.__doc__
binned_statistic.__doc__ = doc1
binned_statistic_dd.__doc__ = doc2
###############################################################################
def binRV(time, rv, err=None, stat='wmean', tstat='wmean', estat='addquad'):
""" Bin a dataset of radial-velocity observations.
Parameters
----------
time : array
The array of times where the radial velocity is measured. This function
does nightly binning, based on the integer part of this array.
rv : array
The radial-velocity values.
err : array (optional)
The uncertainty on the radial-velocity values.
stat : string or callable (optional)
The statistic to compute for the radial velocities. By default, this is
'mean' if the `err` array is not provided and weighted mean ('wmean')
if `err` is provided. In that case, the routine does inverse variance
averaging, using 1/`err`**2 as weights. See other available statistics
on the `binned_statistic` function.
tstat : string or callable (optional)
The statistic to compute for the times. The default is the same as for
the radial velocities. See other available statistics on the
`binned_statistic` function.
estat : string or callable (optional)
The statistic to compute for the errors. The default is to add them
quadratically and divide by the number. See other available statistics
on the `binned_statistic` function.
Notes
-----
Arguably, the most justified way to perform binning is to use the defaults,
with times, radial velocities and errors as input. This will lead to a
weighted average of the times, a weighted average of the radial velocities
and quadratically added errors.
"""
# "nightly" binning, based on the integer part of the time array
intt = time.astype(int)
_, indices = np.unique(intt, return_index=True)
# include the last time in the array of bins, so we don't miss a point
# but be careful: if it is alone, perturb the last time by a small ammount
# so that the difference time[-1] - bins[-1] is not zero
if indices[-1] == time.size - 1:
bins = np.r_[time[indices], time[-1] + 1e-10]
else:
bins = np.r_[time[indices], time[-1]]
# weighted mean
wmean = lambda a, e: np.average(a, weights=1 / e**2)
# bin the RVs
if (err is not None) and (stat == 'wmean'):
stat = wmean # default is weighted mean
brv = binned_statistic(time, rv, statistic=stat, bins=bins, range=None,
weights=err)
# bin the times
if (err is not None) and (tstat == 'wmean'):
tstat = wmean # default is weighted mean
times = binned_statistic(time, time, statistic=tstat, bins=bins,
range=None, weights=err)
# if there are errors, bin them too
if err is not None:
if estat == 'addquad':
# this function adds the elements of `a` quadratically
add_quadratically = lambda a: np.sqrt(np.add.reduce(a**2))
# this function then divides by the number of elements
mean_quadratically = lambda a: add_quadratically(a) / a.size
# the two previous functions are only separated for clarity
# bin
estat = mean_quadratically
errors = binned_statistic(time, err, statistic=estat, bins=bins)
else:
errors = binned_statistic(time, err, statistic=estat, bins=bins)
return times[0], brv[0], errors[0]
else:
return times[0], brv[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment