Skip to content

Instantly share code, notes, and snippets.

@d1manson
Last active August 29, 2015 14:15
Show Gist options
  • Save d1manson/6dc4858f4688de3d8636 to your computer and use it in GitHub Desktop.
Save d1manson/6dc4858f4688de3d8636 to your computer and use it in GitHub Desktop.
compute pearson p-value using permutations and/or resampling rather than analytically
# -*- coding: utf-8 -*-
"""
Module contents:
# pearsonr_permutations
# pearsonr_sign_bootstrap
# permute_multi <-- used by pearsonr_permutations
DM, Feb 2015.
"""
import numpy as np
def pearsonr_permutations(x,y,k=100,tails=2):
"""
For matching 1D vectors `x` and `y`, this returns `(r,p)` exactly like
`sp.stats.pearsonr`, except that here `p` is obtained using the
"permutation test", see:
http://en.wikipedia.org/wiki/Pearson_product-moment_correlation_coefficient
This means we permute `y` with respect to `x`, `k` times, compute
the correlation each time, and then see what fraction of these permuted `r`
values exceed the original `r` value. When `tails=2` we use `abs(r)` to
perform this comparison, when `tails=1`, we check whether `r` is positive
or negative and count only the number of larger or smaller permuted `r`s
respectively.
See `permute_multi` in this module for notes on cached random values.
It is this caching that makes this function so fast (once it has warmed up).
"""
if x.ndim != 1 or x.shape != y.shape:
raise Exception("Expected x and y to be a pair of matching 1d arrays.")
k = int(k)
n = len(x)
# we must request sufficient precision for what lies ahead
x,y = x.astype(np.float64), y.astype(np.float64)
# compute the stuff that is invariant of y permutations
Sx = np.sum(x)
Sy = np.sum(y)
Sxx = np.dot(x,x)
Syy = np.dot(y,y)
denom = np.sqrt(n*Sxx-Sx**2) * np.sqrt(n*Syy-Sy**2)
# compute the unshufled Sxy
Sxy_original = np.dot(x,y)
# for k permutations of y, compute Sxy
Sxy_shuffles = np.dot(x, permute_multi(y,k).T)
# Compute the r value for the original and the shuffles
r_original = (n*Sxy_original - Sx*Sy)/denom
r_shuffles = (n*Sxy_shuffles - Sx*Sy)/denom
if tails == 2:
p = np.sum(abs(r_shuffles) > abs(r_original))
elif tails == 1 and r_original > 0:
p = np.sum(r_shuffles > r_original)
elif tails == 1 and r_original < 0:
p = np.sum(r_shuffles < r_original)
else:
raise Exception("tails should be 1 or 2, and r must be non-zero")
return r_original, float(p)/float(k)
def pearsonr_sign_bootstrap(x,y,k=1000):
"""
For matching 1D vectors `x` and `y`, this returns `(s,p)` much like
`sp.stats.pearsonr`, except that here `s` is just the sign of `r`, rather than
its exact value, and `p` is obtained as follows:
Pick `n` samples from the list of `n` `(x,y)` pairs (i.e. with replacement)
and compute `s`. Repeat this `k` times, and define `p` as the fraction of
`s` values which have the opposite sign to the original (un-resampled) `s`.
Note that we don't need to compute the two standard deviations in the denominator
because they are both positive (well, if we ignore the possibility of equaling zero)
and thus do not effect the sign of the pearson as a whole.
TODO: perhaps we should deal properly with the case where the resampled x or
y has 0 standard deviation. In such cases we ought to throw a divide-by-zero
error, or something.
DM, Feb 2015
"""
if x.ndim != 1 or x.shape != y.shape:
raise Exception("Expected x and y to be a pair of matching 1d arrays.")
k = int(k)
n = len(x)
# prepare a matrix with 3 columns: [x y x*y]
x_y_xy = np.hstack(( \
x[:,np.newaxis],
y[:,np.newaxis],
(x*y)[:,np.newaxis]))
# compute the original [Sx Sy Sxy]
S_x_y_xy_original = np.sum(x_y_xy, axis=0)
# for k resamplings, compute [Sx Sy Sxy]
S_x_y_xy_shuffles = np.empty(shape=(k,3))
for i in xrange(k):
S_x_y_xy_shuffles[i,:] = np.dot(np.bincount(np.random.randint(n,size=n),minlength=n),
x_y_xy)
# Compute the s value for the original and the shuffles
s_original = np.sign(n*S_x_y_xy_original[2] - S_x_y_xy_original[0] * S_x_y_xy_original[1])
s_shuffles = np.sign(n*S_x_y_xy_shuffles[:,2] - S_x_y_xy_shuffles[:,0] * S_x_y_xy_shuffles[:,1])
# work out what fraction of the shuffles have the opposite sign to the original
p = np.sum(s_shuffles != s_original)
return int(s_original), float(p)/float(k)
def randint_(n, shape, _cache={}, cache_cmd=None):
k = np.product(shape)
if _cache.get('n',0) < n:
_cache['inds'] = cached_inds = np.random.randint(n,size=k)
_cache['n'] = n
else:
cached_inds = _cache['inds']
if _cache.get('n',0) == n:
inds = cached_inds
elif len(cached_inds) > k:
inds = cached_inds.compress(cached_inds < n)
else:
inds = []
if len(inds) < k:
raise NotImplementedError("this can easily happen but it's difficult to " +\
"know what the best caching behaviour is here...ideally should store " +\
"the full 0-RAND_MAX numbers and then convert to 0-n as needed. See" + \
"https://github.com/numpy/numpy/blob/4cbced22a8306d7214a4e18d12e651c034143922/numpy/random/mtrand/randomkit.c#L260")
else:
inds = inds[:k]
return inds.reshape(shape)
def permute_multi(X, k, _cache={}, cache_cmd=None):
"""For 1D input `X` of len `n`, it generates an `(k,n)` array
giving `k` permutations of `X`.
When used repeatedly with similar n values, this function will be
fast as it caches the permutation indicies. In (almost?) all cases
this will be fine, however we take the precutionary measure of
pre-permuting the data before applying the cached permutations,
where the extra permutation is unique each time. Note that strictly
speaking, this may still not be truly sufficient: if the correlation
between rows in the cached indices happens to be quite a long way
from the expectation, then you are effecitvely using less than `k`
permutations, wich would be fine if you did it once, but if you use
that many times, then any meta-analysis will possibly have signficantly
less power than it appears to...note that without caching this could also
happen but the probabilities grow combinatorially, so very quickly become
infitesimal...thus this discussion really is only relveant for small `k`
and `n`. For small `k`, you could partly gaurd agaisnt this by artificially
using a larger `k` and then randomly subsampling rows.
If you set `cache_cmd=None`, the cache will be used.
For `cache_cmd=-1` the cache will not be used.
For `cache_cmd=+1` the cache will be reset and then used.
TODO: we possibly want to apply some heuristic to shrink the cached
array if it appears to be too big in the vast majority of cases.
Ignoring the caching, this function is just doing the following:
def permute_multi(X,k):
n = len(X)
ret = np.empty(shape=(k,n),dtype=X.dtype)
for i in xrange(k):
ret[i,:] = np.random.permutation(X)
return ret
"""
if cache_cmd == -1:
_cache = {} # local dict will go out of scope at end of function
elif cache_cmd == +1:
_cache['inds'] = np.array([[]]) # reset the cached inds
cached_inds = _cache.get('inds',np.array([[]]))
n = len(X)
# make sure that cached_inds has shape >= (k,n)
if cached_inds.shape[1] < n:
_cache['inds'] = cached_inds = np.empty(shape=(k,n),dtype=int)
for i in xrange(k):
cached_inds[i,:] = np.random.permutation(n)
elif cached_inds.shape[0] < k:
raise NotImplementedError("TODO: need to generate more rows")
inds = cached_inds[:k,:] # dispose of excess rows
if n < cached_inds.shape[1]:
# dispose of high indices
inds = inds.compress(inds.ravel()<n).reshape((k,n))
# To prevent spurious patterns in the inds mapping back to the data in a
# consistent way, we compose all the cached permutations with a novel, unique,
# permutation, i.e. we perform the novel permutation first and then apply
# the other permutations to that.
X = np.random.permutation(X)
return X[inds]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment