Skip to content

Instantly share code, notes, and snippets.

@pavlin-policar
Created May 10, 2020 13:01
Show Gist options
  • Save pavlin-policar/a090d87f5dba1a98abd3d152794613e7 to your computer and use it in GitHub Desktop.
Save pavlin-policar/a090d87f5dba1a98abd3d152794613e7 to your computer and use it in GitHub Desktop.
from collections import OrderedDict
from typing import Iterable
import numpy as np
import scipy.sparse as sp
import scipy.stats as stats
import pandas as pd
def FDR(p_values: Iterable, dependent=False, m=None, ordered=False) -> Iterable:
"""False Discovery Rate correction for multiple comparisons.
Parameters
----------
p_values: Union[list, np.ndarray]
p-values.
dependent: bool
use correction for dependent hypotheses (default False).
m: int
number of hypotheses tested (default ``len(p_values)``).
ordered: bool
prevent sorting of p-values if they are already sorted (default False).
Returns
-------
Union[list, np.ndarray]
Same as the input
"""
if p_values is None or len(p_values) == 0 or (m is not None and m <= 0):
return None
is_list = isinstance(p_values, list)
p_values = np.array(p_values)
if m is None:
m = len(p_values)
if not ordered:
ordered = (np.diff(p_values) >= 0).all()
if not ordered:
indices = np.argsort(p_values)
p_values = p_values[indices]
if dependent: # correct q for dependent tests
m *= sum(1 / np.arange(1, m + 1))
fdrs = (p_values * m / np.arange(1, len(p_values) + 1))[::-1]
fdrs = np.array(np.minimum.accumulate(fdrs)[::-1])
if not ordered:
fdrs[indices] = fdrs.copy()
return fdrs if not is_list else list(fdrs)
def mannwhitneyu(*args, **kwargs):
try:
return stats.mannwhitneyu(*args, **kwargs).pvalue
except ValueError:
return 1
def differential_expression(
adata, group_by="ClusterID", layer=None, alternative="greater", test_type="wilcox",
lfc_thresh=0.25, fdr_thresh=0.05, min_pct=0.1, n_jobs=1,
):
if test_type not in ("wilcox",):
raise ValueError()
if alternative not in ("two-sided", "less", "greater"):
raise ValueError()
if layer is None:
X = adata.X
else:
X = adata.layers[layer]
results = []
n_groups = len(adata.obs[group_by].unique())
n_genes = len(adata.var_names)
for group_idx, group_id in enumerate(sorted(adata.obs[group_by].unique())):
mask = adata.obs[group_by].values == group_id
print(
f"Testing {n_genes} genes for group {group_id}, "
f"{mask.sum()} cells ({group_idx + 1} / {n_groups}) {test_type}"
)
# Filter out genes that are expressed in too few cells
pct_expressed_group = np.ravel(np.mean(X[ mask] > 0, axis=0))
pct_expressed_other = np.ravel(np.mean(X[~mask] > 0, axis=0))
mask_pct_expressed = pct_expressed_group >= min_pct
# Filter out genes with too low log-fold change
means_group = np.ravel(np.mean(X[mask], axis=0))
means_other = np.ravel(np.mean(X[~mask], axis=0))
log_fold_change = np.log((means_group + 1e-5) / (means_other + 1e-5)) / np.log(2)
mask_lfc = np.abs(log_fold_change) >= lfc_thresh
def compare_groups(gene_idx):
x0 = X[ mask, gene_idx]
x1 = X[~mask, gene_idx]
if sp.issparse(x0):
x0 = x0.toarray()
if sp.issparse(x1):
x1 = x1.toarray()
x0 = np.ravel(x0)
x1 = np.ravel(x1)
if test_type == "wilcox":
pval = mannwhitneyu(x0, x1, alternative=alternative)
return OrderedDict([
("gene_id", adata.var_names[gene_idx]),
("group_id", group_id),
("n_group", x0.shape[0]),
("n_other", x1.shape[0]),
("n_expressed_group", np.sum(x0 > 0)),
("n_expressed_other", np.sum(x1 > 0)),
("avg_expression_group", means_group[gene_idx]),
("avg_expression_other", means_other[gene_idx]),
("pct_expressed_group", pct_expressed_group[gene_idx]),
("pct_expressed_other", pct_expressed_other[gene_idx]),
("log_fold_change", log_fold_change[gene_idx]),
("test_type", test_type),
("pvalue", pval),
])
final_gene_mask = mask_pct_expressed & mask_lfc
with ThreadPoolExecutor(max_workers=n_jobs) as executor:
results.extend(executor.map(compare_groups, np.argwhere(final_gene_mask).ravel()))
df = pd.DataFrame(results)
df["fdr"] = FDR(df["pvalue"].values, m=n_genes)
df = df.loc[df["fdr"] <= fdr_thresh]
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment