Skip to content

Instantly share code, notes, and snippets.

@squirrelo
Last active April 6, 2016 18:52
Show Gist options
  • Save squirrelo/761b0cb4bdca787fd512 to your computer and use it in GitHub Desktop.
Save squirrelo/761b0cb4bdca787fd512 to your computer and use it in GitHub Desktop.
This runs permanova or anosim on all catgories in your qiime mapping file. Simmilar to the compare_category.py script in QIIME, but runs over all categories at once.
#!/usr/bin/env python
import click
from pandas.io.parsers import read_table
from pandas import DataFrame
from skbio.stats.distance import DistanceMatrix, permanova, anosim
from statsmodels.sandbox.stats.multicomp import multipletests
DIST_TESTS = {
'permanova': permanova,
'anosim': anosim
}
@click.command()
@click.option('--input_dm', '-i', required=True, type=click.Path(
resolve_path=True, readable=True, exists=True, dir_okay=False),
help='Path to the distance matrix file.')
@click.option('--input_map', '-m', required=True, type=click.Path(
resolve_path=True, readable=True, exists=True, dir_okay=False),
help='Path to the qiime mapping file.')
@click.option('--f_out', '-o', required=True, type=click.Path(
resolve_path=True, writable=True, dir_okay=False),
help='Path to output results file.')
@click.option('--categories', '-c', default=None,
help='Comma seperated list of categories to use from mapping'
'file. Default use all columns')
@click.option('--permutations', '-p', type=int, default=999,
help='Number of permutations (default 999)')
@click.option('--max_vals', '-n', type=int, default=12,
help='Maximum number of valuse in a category (default 12)')
@click.option('--min_size', '-s', type=int, default=1,
help='Minimum number of samples required per category '
'(default 1)')
@click.option('--verbose', '-v', is_flag=True,
help='Run in verbose mode (Default False)')
@click.option('--test', '-t', type=click.Choice(DIST_TESTS.keys()),
default='permanova',
help='Test to use (choose from %s) Default: permanova'
% ', '.join(DIST_TESTS.keys()))
def main(input_dm, input_map, f_out, test, permutations=999, max_vals=12,
min_size=1, verbose=False, categories=None):
map_df = read_table(input_map, header=0, index_col=0)
dm = DistanceMatrix.read(input_dm)
if categories is not None:
cats =categories.split(',')
else:
cats = map_df.columns
stats, skipped = compare_categories_all_cats(
dm, map_df, test, max_vals=max_vals, min_size=min_size,
permutations=permutations, categories=cats, verbose=verbose)
stats.to_csv(f_out, sep='\t')
# Append skipped columns information to the end of the file
tabs = "\t".join([''] * (len(stats.columns)-1))
with open(f_out, 'a') as f:
f.write("\n\nSkipped\tReason%s\n" % tabs)
for s, r in skipped:
f.write("%s\t%s%s\n" % (s, r, tabs))
def compare_categories_all_cats(input_dm, input_map, test='permanova',
categories=None, permutations=999, max_vals=12,
min_size=1, verbose=False):
"""Compares all or given categories for given metadata and distance matrix
Parameters
----------
input_dm : DistanceMatrix object
Distance matrix to use for testing
input_map : pandas DataFrame
Dataframe with all metadata, indexed on samples
test : {'permanova', 'anosim'}, optional
Test type to use
categories : list of str, optional
If given, the categories to run tests on. Default all cols in dataframe
permutations : int, optional
Number of permutations. Default 999
max_vals : int, optional
Max number of distinct values to allow in a single column. Default 12
min_size : int, optional
Minimum number of samples required per category value. Default 1
verbose : bool, optional
Whether to print information during run. Default False
Returns
-------
df : DataFrame
Test run information and p values, indexed on column
skipped : List of tuples
skipped: List of tuples, skipped column and reason
"""
out_stats = []
skipped = []
# Sanity check that the mapping file contains all samples in dist matrix
if not set(input_dm.ids).issubset(input_map.index):
raise RuntimeError(
"Mapping file missing the following samples: %s" % ', '.join(
set(input_dm.ids).difference(input_map.index)))
# filter the mapping file to only samples in dist matrix
map_df = input_map.loc[list(input_dm.ids)]
# if needed, filter columns to list given
if categories is not None:
input_map = input_map[categories]
for col in input_map.columns:
uniques_list = list(map_df[col].dropna())
uniques = set(uniques_list)
counts = None
reason = ""
# ignore columns without non-null values
if len(uniques) == 0:
reason = "No values in category"
# ignore columns with all same value
if len(uniques) == 1:
reason = "All values the same"
# ignore columns with all different values
if len(uniques_list) == len(uniques):
reason = "All values different"
# ignore columns with too many categories (useful for continuous cats)
if len(uniques) > max_vals:
reason = "%d exceeds max %d categories" % (len(uniques), max_vals)
if not reason:
# create filtered distance matrix and use to group categories
filtered_dm = input_dm.filter([i for i, x in
map_df[col].notnull().iteritems() if x])
counts = [(g, len(s)) for g, s in input_map.groupby(col)]
counts.sort(key=lambda x: x[0])
# Make sure we have at least 2 groups and min num samples
if len(counts) == 1:
reason = "Category with one group"
elif any(x[1] < min_size for x in counts):
reason = "At least one group smaller than %d" % min_size
if reason:
skipped.append((col, reason))
if verbose:
print("Skipping %s: %s" % (col, reason))
if counts is not None:
print(counts)
continue
try:
res = DIST_TESTS[test](filtered_dm, map_df, col, permutations)
except Exception as e:
print("ERROR ON %s" % col)
raise e
if verbose:
print(col)
print(counts)
print(res)
res['column'] = col
sizes = [x[1] for x in counts]
res['min group size'] = min(sizes)
res['max group size'] = max(sizes)
res['groups'] = str(counts)
out_stats.append(res)
out_stats.sort(key=lambda x: x['p-value'])
df = DataFrame(out_stats, columns=out_stats[0].keys())
_, corrected, _, _ = multipletests(df['p-value'],
method='bonferroni')
df['p-bonferroni'] = list(corrected)
_, corrected, _, _ = multipletests(df['p-value'], method='fdr_bh')
df['p-fdr_bh'] = list(corrected)
df.set_index('column', inplace=True)
return df, skipped
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment