Skip to content

Instantly share code, notes, and snippets.

@ecwheele
Created March 17, 2017 16:43
Show Gist options
  • Save ecwheele/85eb16ae71c01323a9ca83e1ce40f49c to your computer and use it in GitHub Desktop.
Save ecwheele/85eb16ae71c01323a9ca83e1ce40f49c to your computer and use it in GitHub Desktop.
#!/bin/env python
from __future__ import print_function
from __future__ import division
from future.utils import raise_with_traceback
import kvector
from argparse import ArgumentParser
import pandas as pd
import numpy as np
import os
#This is not currently used in the function
#class IllegalFileTypeError(ValueError):
# """raised when an incorrect input file is given, should be either bed or fasta"""
#raise_with_traceback(IllegalFileTypeError(foreground_fasta_or_bed))
def generate_kmerdiff_delta_score_from_fasta(foreground_fasta,
background_fasta,
kmer_length,
output_file):
print("counting kmers in foreground")
foreground = count_kmers_from_fasta(foreground_fasta,
kmer_length=kmer_length)
print("counting kmers in background")
background = count_kmers_from_fasta(background_fasta,
kmer_length=kmer_length)
all_compare_functions(foreground=foreground,
background=background,
output_file=output_file)
def generate_kmerdiff_delta_score_from_bed(foreground_bed,
background_bed,
genome_fasta,
kmer_length,
threads,
output_file):
print("counting kmers in foreground")
foreground = count_kmers_from_bed(foreground_bed,
genome_fasta=genome_fasta,
kmer_length=kmer_length,
threads=threads)
print("counting kmers in background")
background = count_kmers_from_bed(background_bed,
genome_fasta=genome_fasta,
kmer_length=kmer_length,
threads=threads)
all_compare_functions(foreground=foreground,
background=background,
output_file=output_file)
def count_kmers_from_bed(bed_file, kmer_length, genome_fasta, threads):
"""
:param bed_file: bed_file of regions to count kmers
:param kmer_length: length of kmers to count (integer)
:param genome_fasta: genome fasta file with fasta sequence of all chromosomes
:param threads: number of threads to use for processing
:return: a dataframe where columns are every kmer and rows are counts for each entry in bed file
"""
if os.path.isfile(bed_file.replace(".bed", "_kmers.csv")):
kmers = pd.read_csv(bed_file.replace(".bed", "_kmers.csv"), index_col=0)
else:
if genome_fasta is None:
print("Your input filetype is bed, need to provide a genome fasta")
kmers = kvector.per_interval_kmers(bed_file,
genome_fasta=genome_fasta,
threads=threads,
kmer_lengths=kmer_length)
kmers.to_csv(bed_file.replace(".bed", "_kmers.csv"))
return kmers
def count_kmers_from_fasta(fasta_file, kmer_length):
"""
:param fasta_file: input fasta file to count kmers from
:param kmer_length: length of kmer to count (integer)
:return: a dataframe where columns are every kmer and rows are counts for each entry in fasta file
"""
if os.path.isfile(fasta_file.replace(".fasta", "_kmers.csv")):
kmers = pd.read_csv(fasta_file.replace(".fasta", "_kmers.csv"), index_col=0)
else:
kmers = kvector.count_kmers(fasta_file, kmer_lengths=kmer_length)
kmers.to_csv(fasta_file.replace(".fasta", "_kmers.csv"))
return kmers
def combine_foreground_and_background_kmers(foreground, background):
"""
:param foreground: dataframe from foreground where columns are every kmer and rows are counts for each entry in fasta file
:param background: dataframe from background where columns are every kmer and rows are counts for each entry in fasta file
:return: combined dataframe where the index is all kmers and the columns are foreground_count and background_count
"""
kmers = pd.DataFrame(foreground.sum(axis=0), columns=['foreground_count'])
kmers = kmers.join(pd.DataFrame(background.sum(axis=0), columns=['background_count']))
return kmers
def calculate_difference_metric(combined_dataframe):
"""
:param combined_dataframe: dataframe with kmers as the index, foreground_count and background_count as columns
:return: dataframe with new column appended with statistic value for each kmer
"""
foreground_total = combined_dataframe['foreground_count'].sum()
background_total = combined_dataframe['background_count'].sum()
combined_dataframe['statistic'] = ztest(combined_dataframe['foreground_count'],
combined_dataframe['background_count'],
foreground_total,
background_total)
combined_dataframe_with_metric = combined_dataframe
return combined_dataframe_with_metric
def comparison_matrix_to_csv(matrix, output_file):
"""
:param matrix: Dataframe to save
:param output_file: Name of file to save as
:return: Nothing
"""
matrix.to_csv(output_file)
def ztest(x1, x2, n1, n2):
"""
parametric Z-test for comparison of two proportions
Args:
x1: number of individuals from sample Population 1 with characteristic
Can be either a single number or a numpy array
x2: number of individuals from sample Population 2 with characteristic
Can be either a single number or a numpy array
n1: total number of individuals in sample population 1
n2: total number of individuals in sample population 2
Returns: z-test score
>>> ztest(50, 100, 20, 200)
18.309508328682536
>>> ztest(50, 100, 100, 200)
0.0
"""
numerator = (x1/n1) - (x2/n2)
g = (x1+x2) / (n1+n2)
denominator = np.sqrt(g * (1-g) * (1/n1 + 1/n2))
delta = numerator / denominator
return delta
def all_compare_functions(foreground, background, output_file):
kmers_combined = combine_foreground_and_background_kmers(foreground=foreground,
background=background)
kmers_combine_with_metric = calculate_difference_metric(combined_dataframe=kmers_combined)
comparison_matrix_to_csv(matrix=kmers_combine_with_metric, output_file=output_file)
return kmers_combine_with_metric
def main():
#usage = "something here"
parser = ArgumentParser()
parser.add_argument(
"--foreground_fasta",
dest="foreground_fasta",
help="foreground file to count kmers on (fasta), not used with foreground_bed",
default=None
)
parser.add_argument(
"--background_fasta",
dest="background_fasta",
help="background file to count kmers on (fasta), not used with background_bed",
default=None
)
parser.add_argument(
"--foreground_bed",
dest="foreground_bed",
help="foreground file to count kmers on (bed), not used with foreground_fasta",
default=None
)
parser.add_argument(
"--background_bed",
dest="background_bed",
help="background file to count kmers on (bed), not used with background_fasta",
default=None
)
parser.add_argument(
"-k",
"--kmer_length",
dest="kmer_length",
help="Length of kmer to count",
type=int
)
parser.add_argument(
"--genome_fasta",
dest="genome_fasta",
help="fasta file of reference genome, required if input is bed file"
)
parser.add_argument(
"--threads",
dest="threads",
help="number of threads to use, only applies when using bed",
type=int
)
parser.add_argument(
"--output_file",
dest="output_file",
help="file to save the results"
)
opts = parser.parse_args()
#if (opts.foreground_fatsa is not None) & (opts.background_fasta is not None):
# generate_kmerdiff_delta_score_from_fasta(foreground_fasta=opts.foreground_fatsa,
# background_fasta=opts.background_fasta,
# kmer_length=opts.kmer_length,
# output_file=opts.output_file)
#else:
generate_kmerdiff_delta_score_from_bed(foreground_bed=opts.foreground_bed,
background_bed=opts.background_bed,
kmer_length=opts.kmer_length,
genome_fasta=opts.genome_fasta,
threads=opts.threads,
output_file=opts.output_file)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment