Skip to content

Instantly share code, notes, and snippets.

@fedarko
Forked from mortonjt/poisson-cat.py
Created April 17, 2019 22:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fedarko/36a6a9c1cf93e15320bb1a89c97877aa to your computer and use it in GitHub Desktop.
Save fedarko/36a6a9c1cf93e15320bb1a89c97877aa to your computer and use it in GitHub Desktop.
Closed form categorical Poisson regression
import numpy as np
import patsy
import pandas as pd
from biom import Table
# This is the main function
def poisson_cat(table, metadata, category, ref=None):
""" Poisson differential abundance.
Parameters
----------
table : biom.Table
Sparse matrix of counts.
metadata : pd.DataFrame
Sample metadata file.
category : str
Metadata category of interest.
ref : str
Reference category. If not specified, the first
category will be picked.
Returns
-------
differential : pd.DataFrame
Differentials of all of the variables of interest.
Notes
-----
This only works on a single categorical variable. If you have continuous
variables, stop it.
This also assumes that the metadata and table have already been matched and
aligned.
TODO
----
Extend this to multiple metadata categories, so that multiple columns
can be analyzed simultaneously
"""
if ref is None:
ref = metadata[category].unique()[0]
idx = metadata[category] == ref
x = idx.astype(np.int64).reshape(-1, 1)
y = table.matrix_data
A = y @ ~x # reference microbe counts
B = table.sum(axis='observation')
N = table.sum(axis='sample')
C = N[idx].sum()
D = N[~idx].sum()
diff = pd.Series(np.log((A * B * C) / (B * B * D - A)),
table.ids(axis='observation'))
return diff
# This is for testing
def random_block_table(reps, n_species,
species_mean=0,
species_var=1.,
effect_size=1,
library_size=10000,
microbe_total=100000, microbe_kappa=0.3,
microbe_tau=0.1, sigma=0.5, seed=None):
""" Differential abundance analysis benchmarks.
The simulation here consists of 3 parts
Step 1: generate class probabilities using logistic distribution
Step 2: generate coefficients from normal distributions
Step 3: generate counts from species distributions
Parameters
----------
reps : int
Number of replicate samples per test.
n_species : int
Number of species.
species_loc : float
Mean of the species prior.
species_variance : float
Variance of species log-fold differences
effect_size : int
The effect size difference between the feature abundances.
n_contaminants : int
Number of contaminant species.
sigma: float
Logistic error variance for class probabilities
library_size : np.array
A vector specifying the library sizes per sample.
template : np.array
A vector specifying feature abundances or relative proportions.
Returns
-------
generator of
pd.DataFrame
Ground truth tables.
pd.DataFrame
Metadata group categories, n_diff and effect_size
pd.Series
Species actually differentially abundant.
"""
state = check_random_state(seed)
data = []
n = reps * 2
k = 2
labels = np.array([-effect_size] * (n // 2) + [effect_size] * (n // 2))
eps = np.random.logistic(loc=0, scale=sigma, size=n)
class_probs = labels + eps
X = np.hstack((np.ones((n, 1)), class_probs.reshape(-1, 1)))
B = np.random.normal(loc=species_mean, scale=species_var, size=(k, n_species))
## Helper functions
# Convert microbial abundances to counts
def to_counts_f(x):
n = state.lognormal(np.log(library_size), microbe_tau)
p = x / x.sum()
return state.poisson(state.lognormal(np.log(n*p), microbe_kappa))
o_ids = ['F%d' % i for i in range(n_species)]
s_ids = ['S%d' % i for i in range(n)]
abs_table = pd.DataFrame(np.exp(X @ B) * microbe_total,
index=s_ids,
columns=o_ids)
rel_data = np.vstack(abs_table.apply(to_counts_f, axis=1))
rel_table = pd.DataFrame(rel_data,
index=s_ids,
columns=o_ids)
metadata = pd.DataFrame({'labels': labels})
metadata['effect_size'] = effect_size
metadata['microbe_total'] = microbe_total
metadata['class_logits'] = class_probs
metadata['intercept'] = 1
metadata.index = s_ids
ground_truth = pd.DataFrame({
'intercept': B[0, :],
'categorical': B[1, :]
}, index=o_ids)
return abs_table, rel_table, metadata, ground_truth
if __name__ == "__main__":
import unittest
class TestPoissonCat(unittest.TestCase):
def setUp(self):
num_samples = 100
reps = 50
n_species = 200
np.random.seed(2)
self.res = random_block_table(
reps, n_species,
species_mean=0,
species_var=1,
microbe_kappa=0.7,
microbe_tau=0.7,
library_size=10000,
microbe_total=100000,
effect_size=1)
def test_poisson_cat(self):
abs_table, rel_table, metadata, ground_truth = self.res
table = Table(rel_table.T, rel_table.columns, rel_table.index)
res_diff = poisson_cat(table, metadata, category='label')
exp_diff = ground_truth['categorical']
# This may not work -- prepare to debug
from scipy.stats import pearsonr
r, p = pearsonr(res_diff, exp_diff)
self.assertGreater(r, 0.5)
self.assertLess(p, 0.01)
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment