Created April 10, 2012 13:19
Multi-hypergeometric distribution
"""Calculates the distribution of a weighted sum of the components of
a multivariate hypergeometric random variable, for the special case of
three components with weights -1, 0 and +1 - although the generating
function can handle any weights and number of components. Given a
value t, it also calculates the p value of t under the null hypothesis
that it was generated as the weighted sum of the multivariate
hypergeometric variable.
python f_1 f_2 f_3 n t
f_1, f_2, f_3 = Population size for each of the three components,
across all of the data.
n = Make n observations without replacement, resulting in x_1, x_2
and x_3 observations of the three outcomes, having weights w_i of -1,
0 and +1.
t = The weighted sum of the n observations: t = -1*x_1 + 0*x_2 + 1*x_3,
whose p-value is to be calculated.
If an urn has s types of balls in it, with populations f_1,...,f_s,
and n balls are drawn from the urn without replacement, then random
variable for the number of each type of ball drawn, X(f,n) = (X_1,
X_2, ... X_s), is multivariate hypergeometrically distributed.
In this case, the urn has s=3 different types of balls in it, with
weights w_i of -1, 0, +1. There are f_1, f_2 and f_3 of each type
present in the urn. There are m=f_1+f_2+f_3 balls in total. n balls
are drawn from the urn without replacement, and the result is
multivariate hypergeometrically distributed X(f,n) =(X_1,X_2,X_2). The
weighted sum of the draws is the random variable Y = -1*X_1 +
0*X_2 + 1*X_3.
The distribution of Y is calculated by enumerating P(X=x) over all
possible outcomes x=(x_1,x_2,x_3). For each outcome we calculate
y=-1*x_1+0*x_2+1*x_3, and contribute P(X=x) to the total for P(Y=y).
The p value of some value t under the null hypothesis that it was
generated by the random variable Y is the smaller of P(Y >= t) and
P(Y <= t) - evaluated by summing over the discrete probabilities in
the calculated distribution of Y.
from __future__ import division
from collections import defaultdict
import sys
def factorial(N):
"""Factorial of N."""
if N == 0: return 1.0
r = 1.0
# Perform the multiplications 1 * 2 * 3 * 4 * ... * N
for i in range(2, N+1):
r *= i
return r
def C(n,r):
"""Number of ways to choose r out of n items."""
nCr = 1
# Because C(n,r) == C(n,n-r), and loop is over r, so we want r to be small
if (r*2) > n:
r = n-r
for i in range(r-1, -1, -1): # r-1 ... 0
nCr = (nCr*(n-i)) // (r-i)
return nCr
def weighted_sum_of_multivariate_hypergeometric(w, f, n):
"""Construct the the sum-of-multivariate-hypergeometrics distribution
@param w: Weights w=(w_1,w_2,...w_s) for observations in each
component of the random variable X=(X_1,X_2,...,X_s). The
weighted sum random variable Y is the dot product Y = w*X.
@param f: Population size of the components (f_1,f_2,...,f_s). The
total population size is m=f_1+f_2+...+f_s
@param n: Make n observations without replacement.
@return: (values,mass) corresponding to the values and probability masses
of the discrete distribution Y=d*X, being the weighted sum of X.
s = len(w)
assert len(f) == s
# m is the size of the population (f_1+f_2+...+f_s)
m = sum(f)
# Y[y] is P[Y=y], the probability of observing Y=y
Y = defaultdict(float)
# maxfree[i] is the maximum number of balls that can
# be drawn in total from components >= i
maxfree = list(f)
for i in range(s-2, -1, -1): # s-2 ... 0
maxfree[i] += maxfree[i+1]
maxfree += [0]
def enumerate_probabilities(i, free, y, g):
"""Recursively enumerate all of the possible outcomes. Add each outcomes
probability to its weighted sum.
@param i: Component of population to enumerate.
@param free: Observations (out of original n) remaining to be made.
@param y: Weighted sum of the observations.
@param g: Probability of the observed outcome.
if free == 0:
# Add the probability of this outcome to total probability of
# observing the weighted sum y
Y[y] += g
# Constraints should ensure that we use up all the observations in time.
assert i < s
# k = number of balls drawn from this component
# free-maxfree[i+1] = minimum number of balls that could be drawn from this component.
# min(f[i],free) = maximum number of balls that could be drawn from this component.
# w[i]*k = weighted contribution of balls drawn from this component
# C(f[i],k) = number of ways to choose k balls from f[i] in the component.
for k in range(max(0, free-maxfree[i+1]), min(f[i], free)+1):
enumerate_probabilities(i+1, free-k, y+(w[i]*k), g*C(f[i],k))
enumerate_probabilities(i=0, free=n, y=0, g=1)
values = sorted(Y.keys())
# Finish the probability using the shared C(m,n) denominator
mass = [Y[k]/C(m,n) for k in values]
return (values, mass)
def discrete_pvalue(values, mass, t):
"""Calculate the p-value of a test value t in a discrete
distribution. That is, the sum of the probability mass that
corresponds to values at least as extreme as t.
@param values: Values of the discrete distribution.
@param mass: Distribution mass associated with each value.
@param t: Value to test for significance.
print "The discrete distribution is: "
for (v,d) in zip(values,mass):
print v, d
print "Sum of probability mass (should be 1.0): %f" % sum(mass)
print ("Mean (expected value): %f" %
sum(v*d for (v,d) in zip(values,mass)))
print "Calculating the p-value of %s" % str(t)
i = values.index(t)
pval_left = sum(mass[j] for j in range(0, i+1))
pval_right = sum(mass[j] for j in range(i, len(values)))
pval = min(pval_right, pval_left)
print "1-tailed p-value: ", pval
except ValueError, e:
print "Test value %d is not one of the values in the distribution." % t
def plot_density(values, mass):
"""For a discrete distribtion, plut probability mass on Y axis against
values of the distribution on the X axis."""
from matplotlib import pylab
pylab.plot(values, mass)
print "Press any key to exit the program."
def main(args):
"""Calculate p-values under the null hypothesis of a weighted sum
of a multivariate hypergeometric random variable.
args = [int(v) for v in args[1:]]
f = args[0:3]
n = args[3]
t = args[4]
except Exception:
print __doc__
# Weights for occurences in each component
w = [-1, 0, 1]
(values,mass) = weighted_sum_of_multivariate_hypergeometric(w, f, n)
discrete_pvalue(values, mass, t)
#plot_density(values, mass)
if __name__ == "__main__":
# Let X be multivariate hypergeometrically distributed with s
# categories having assigned values x_1,...x_s having f_1,...f_s
# instances in the urn. m is the sum of all the f_i.
# Let Y be the sum of of n observations from X. Y is discrete random
# variable of s' values y_1,...y_s' with frequency g_1,...g_s'.
# Y will range from n*x_0 to n*x_s, and s' is given by
# | Outer Sum from 1..n of the set { x_0,...,x_s } |
from __future__ import division
import sys
# Factorial of N
def factorial( N ):
if N == 0:
return 1.0
r = 1.0
for i in range( 2, N+1 ):
r *= i
return r
# Number of combinations of r items taken from n
def combinations( n, r ):
nCr = 1
if (r*2) > n:
r = n-r
for i in range( r-1, -1, -1 ):
nCr = ( nCr * ( n - i ) ) // ( r - i )
return nCr
# Enumerate probability mass pieces from the sum-of-multivariate-hypergeometrics distribution
# Y, g, s, m, n, f, x are described at the top
# i is the number of the category from X that we are processing
# prob[Y] is the probability of observing sum Y
# free is the number of observations (out of n) that remain to be used
# maxfree[i] is the maximum allowed value of free at point i
# k is the number of observations to use from category i
def enumerate_mhyper_sum( i, prob, free, maxfree, Y, g, s, m, n, f, x ):
if free == 0:
if Y not in prob: prob[Y] = 0
prob[Y] += g
for k in range( max( 0, free-maxfree[i+1] ), min( f[i], free ) + 1 ):
enumerate_mhyper_sum( i+1, prob, free-k, maxfree, Y+(x[i]*k), g*combinations(f[i],k), s, m, n, f, x )
# Construct the sum-of-multivariate-hypergeometrics distribution
# Return a discrete distribution such that values[i] has a frequency
# of mass[i]
def construct_mhyper_sum( x, f, n ):
s = len(x)
m = sum(f)
prob = dict()
maxfree = list( f )
for i in range( s-2, -1, -1 ):
maxfree[i] += maxfree[i+1]
maxfree += [ 0 ]
enumerate_mhyper_sum( 0, prob, n, maxfree, 0, 1, s, m, n, f, x )
values = []
mass = []
for k in sorted( prob.keys() ):
values.append( k )
mass.append( prob[k] / combinations(m,n) )
return ( values, mass )
n = int(sys.argv[4])
x = [ -1, 0, 1 ]
f = [ int(sys.argv[1]),int(sys.argv[2]) , int(sys.argv[3]) ]
t = int(sys.argv[5])
#print f
(values,mass) = construct_mhyper_sum( x, f, n )
#for (v,d) in zip(values,mass):
# print v, d
#print "Probability Mass Sums to: %f" % sum(mass)
#print "Population Mean (expected sum of %d observations) is: %f" % (n, sum( v*d for (v,d) in zip(values,mass) ) )
pval = 0
#print t #debug
for num in range(len(values)):
if t == values[num]:
if t >= 0:
for calc in range( num, len(values) ):
# print mass[calc] #debug
pval = pval + mass[calc]
for calc in range( 0, num+1 ):
pval = pval + mass[calc]
#print mass[calc] #debug
print "1-tailed: ", pval
print "2-tailed: ", pval*2
#from matplotlib import pylab as p
#p.plot( vals, density )
