Last active
April 6, 2016 18:52
-
-
Save squirrelo/761b0cb4bdca787fd512 to your computer and use it in GitHub Desktop.
This runs permanova or anosim on all catgories in your qiime mapping file. Simmilar to the compare_category.py script in QIIME, but runs over all categories at once.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import click | |
from pandas.io.parsers import read_table | |
from pandas import DataFrame | |
from skbio.stats.distance import DistanceMatrix, permanova, anosim | |
from statsmodels.sandbox.stats.multicomp import multipletests | |
DIST_TESTS = { | |
'permanova': permanova, | |
'anosim': anosim | |
} | |
@click.command() | |
@click.option('--input_dm', '-i', required=True, type=click.Path( | |
resolve_path=True, readable=True, exists=True, dir_okay=False), | |
help='Path to the distance matrix file.') | |
@click.option('--input_map', '-m', required=True, type=click.Path( | |
resolve_path=True, readable=True, exists=True, dir_okay=False), | |
help='Path to the qiime mapping file.') | |
@click.option('--f_out', '-o', required=True, type=click.Path( | |
resolve_path=True, writable=True, dir_okay=False), | |
help='Path to output results file.') | |
@click.option('--categories', '-c', default=None, | |
help='Comma seperated list of categories to use from mapping' | |
'file. Default use all columns') | |
@click.option('--permutations', '-p', type=int, default=999, | |
help='Number of permutations (default 999)') | |
@click.option('--max_vals', '-n', type=int, default=12, | |
help='Maximum number of valuse in a category (default 12)') | |
@click.option('--min_size', '-s', type=int, default=1, | |
help='Minimum number of samples required per category ' | |
'(default 1)') | |
@click.option('--verbose', '-v', is_flag=True, | |
help='Run in verbose mode (Default False)') | |
@click.option('--test', '-t', type=click.Choice(DIST_TESTS.keys()), | |
default='permanova', | |
help='Test to use (choose from %s) Default: permanova' | |
% ', '.join(DIST_TESTS.keys())) | |
def main(input_dm, input_map, f_out, test, permutations=999, max_vals=12, | |
min_size=1, verbose=False, categories=None): | |
map_df = read_table(input_map, header=0, index_col=0) | |
dm = DistanceMatrix.read(input_dm) | |
if categories is not None: | |
cats =categories.split(',') | |
else: | |
cats = map_df.columns | |
stats, skipped = compare_categories_all_cats( | |
dm, map_df, test, max_vals=max_vals, min_size=min_size, | |
permutations=permutations, categories=cats, verbose=verbose) | |
stats.to_csv(f_out, sep='\t') | |
# Append skipped columns information to the end of the file | |
tabs = "\t".join([''] * (len(stats.columns)-1)) | |
with open(f_out, 'a') as f: | |
f.write("\n\nSkipped\tReason%s\n" % tabs) | |
for s, r in skipped: | |
f.write("%s\t%s%s\n" % (s, r, tabs)) | |
def compare_categories_all_cats(input_dm, input_map, test='permanova', | |
categories=None, permutations=999, max_vals=12, | |
min_size=1, verbose=False): | |
"""Compares all or given categories for given metadata and distance matrix | |
Parameters | |
---------- | |
input_dm : DistanceMatrix object | |
Distance matrix to use for testing | |
input_map : pandas DataFrame | |
Dataframe with all metadata, indexed on samples | |
test : {'permanova', 'anosim'}, optional | |
Test type to use | |
categories : list of str, optional | |
If given, the categories to run tests on. Default all cols in dataframe | |
permutations : int, optional | |
Number of permutations. Default 999 | |
max_vals : int, optional | |
Max number of distinct values to allow in a single column. Default 12 | |
min_size : int, optional | |
Minimum number of samples required per category value. Default 1 | |
verbose : bool, optional | |
Whether to print information during run. Default False | |
Returns | |
------- | |
df : DataFrame | |
Test run information and p values, indexed on column | |
skipped : List of tuples | |
skipped: List of tuples, skipped column and reason | |
""" | |
out_stats = [] | |
skipped = [] | |
# Sanity check that the mapping file contains all samples in dist matrix | |
if not set(input_dm.ids).issubset(input_map.index): | |
raise RuntimeError( | |
"Mapping file missing the following samples: %s" % ', '.join( | |
set(input_dm.ids).difference(input_map.index))) | |
# filter the mapping file to only samples in dist matrix | |
map_df = input_map.loc[list(input_dm.ids)] | |
# if needed, filter columns to list given | |
if categories is not None: | |
input_map = input_map[categories] | |
for col in input_map.columns: | |
uniques_list = list(map_df[col].dropna()) | |
uniques = set(uniques_list) | |
counts = None | |
reason = "" | |
# ignore columns without non-null values | |
if len(uniques) == 0: | |
reason = "No values in category" | |
# ignore columns with all same value | |
if len(uniques) == 1: | |
reason = "All values the same" | |
# ignore columns with all different values | |
if len(uniques_list) == len(uniques): | |
reason = "All values different" | |
# ignore columns with too many categories (useful for continuous cats) | |
if len(uniques) > max_vals: | |
reason = "%d exceeds max %d categories" % (len(uniques), max_vals) | |
if not reason: | |
# create filtered distance matrix and use to group categories | |
filtered_dm = input_dm.filter([i for i, x in | |
map_df[col].notnull().iteritems() if x]) | |
counts = [(g, len(s)) for g, s in input_map.groupby(col)] | |
counts.sort(key=lambda x: x[0]) | |
# Make sure we have at least 2 groups and min num samples | |
if len(counts) == 1: | |
reason = "Category with one group" | |
elif any(x[1] < min_size for x in counts): | |
reason = "At least one group smaller than %d" % min_size | |
if reason: | |
skipped.append((col, reason)) | |
if verbose: | |
print("Skipping %s: %s" % (col, reason)) | |
if counts is not None: | |
print(counts) | |
continue | |
try: | |
res = DIST_TESTS[test](filtered_dm, map_df, col, permutations) | |
except Exception as e: | |
print("ERROR ON %s" % col) | |
raise e | |
if verbose: | |
print(col) | |
print(counts) | |
print(res) | |
res['column'] = col | |
sizes = [x[1] for x in counts] | |
res['min group size'] = min(sizes) | |
res['max group size'] = max(sizes) | |
res['groups'] = str(counts) | |
out_stats.append(res) | |
out_stats.sort(key=lambda x: x['p-value']) | |
df = DataFrame(out_stats, columns=out_stats[0].keys()) | |
_, corrected, _, _ = multipletests(df['p-value'], | |
method='bonferroni') | |
df['p-bonferroni'] = list(corrected) | |
_, corrected, _, _ = multipletests(df['p-value'], method='fdr_bh') | |
df['p-fdr_bh'] = list(corrected) | |
df.set_index('column', inplace=True) | |
return df, skipped | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment