Created
May 25, 2012 22:33
-
-
Save superbobry/2790940 to your computer and use it in GitHub Desktop.
Bioinf. Algorithms Homework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# from __future__ import print_function | |
import csv | |
import itertools | |
import errno | |
import os | |
import os.path | |
import sys | |
import time | |
from collections import defaultdict | |
import numpy as np | |
from Bio import SeqIO | |
from Bio.Data import IUPACData | |
from Bio.SeqUtils.ProtParam import ProteinAnalysis | |
from sklearn import ensemble | |
from sklearn.cross_validation import train_test_split | |
from sklearn.externals import joblib | |
from sklearn.utils import fixes | |
def GC_skew(seq, window=100): | |
"""Calculates GC skew (G-C)/(G+C) for multuple windows along the sequence. | |
Returns a list of ratios (floats), controlled by the length of the sequence | |
and the size of the window. | |
Does NOT look at any ambiguous nucleotides. | |
""" | |
values = [] | |
for i in range(0, len(seq), window): | |
s = seq[i: i + window] | |
g = s.count('G') + s.count('g') | |
c = s.count('C') + s.count('c') | |
# HACK(Sergei): default implementation raises 'ZeroDivisionError'' | |
# on non-GC sequences. | |
if not g + c: | |
values.append(0.) | |
else: | |
skew = (g - c) / float(g + c) | |
values.append(skew) | |
return values or [0.] | |
def molecular_weight(seq): | |
"""Calculate the molecular weight of a DNA sequence.""" | |
weight_table = IUPACData.unambiguous_dna_weights | |
# HACK(Sergei): once again, fails with KeyError on 'N's. | |
return np.sum(weight_table.get(x, 0.0) for x in seq) | |
chr_map = map(chr, xrange(256)) | |
chr_map[ord("A")] = "0" | |
chr_map[ord("C")] = "1" | |
chr_map[ord("G")] = "2" | |
chr_map[ord("T")] = "3" | |
chr_map = "".join(chr_map) | |
def kmers(seq, k): | |
seen = defaultdict(int) | |
seq = str(seq) | |
for i in xrange(0, len(seq) - k): | |
chunk = seq[i:i + k] | |
if "N" not in chunk: | |
seen[int(chunk.translate(chr_map), base=4)] += 1 | |
q25 = np.percentile(seen.values(), .25) | |
q75 = np.percentile(seen.values(), .75) | |
result = [] | |
for kmer, count in seen.iteritems(): | |
if count < q25 or count > q75: | |
continue | |
result.append(kmer * count) | |
else: | |
return np.array(result) | |
# 4-element bins | |
bins = sorted(set("".join(bin) for bin in | |
itertools.product("ACGT", repeat=2))) | |
def hashed_kmers(seq, k=2): | |
seen = dict.fromkeys(bins, 0) | |
seq = str(seq) | |
for i in xrange(0, len(seq) - k): | |
seq_chunk = seq[i:i + k] | |
if "N" not in seq_chunk: | |
seen[seq_chunk] += 1 | |
kmers = np.zeros(len(bins)) | |
for idx, count in enumerate(seen.itervalues()): | |
kmers[idx] = count | |
return kmers | |
memory = joblib.Memory(cachedir="/tmp", verbose=0) | |
@memory.cache | |
def generate_interesting_repeats(k): | |
result = [] | |
if k % 2: | |
chunks = generate_interesting_repeats(k // 2) | |
else: | |
chunks = [] | |
for n in ["A", "C", "G", "T"]: | |
result.append("".join(itertools.repeat(n, k))) # ex.: AAAAAA | |
for chunk in chunks: | |
result.append(chunk + n + chunk) # ex.: AAACAAA | |
return result | |
def nucleotide_repeats(seq, k=8): | |
return np.fromiter( | |
(seq.count(repeat) for repeat in generate_interesting_repeats(k)), | |
np.int64) | |
class Genome(object): | |
cache = {} | |
def __init__(self, base_dir): | |
self.base_dir = base_dir | |
def features(self, chrom, start, end): | |
if chrom not in self.cache: | |
path = os.path.join(self.base_dir, chrom + ".fasta") | |
self.cache[chrom], = SeqIO.parse(open(path), "fasta") | |
chrom = self.cache.get(chrom).seq | |
seq = chrom[start:end] | |
protein = seq.translate() | |
analysis = ProteinAnalysis(str(protein)) | |
helix, turn, sheet = analysis.secondary_structure_fraction() | |
trivial = [start, end, end - start, | |
hash(str(chrom)), | |
molecular_weight(seq), | |
protein.count("M"), protein.count("*"), | |
analysis.aromaticity(), | |
helix, turn, sheet, | |
np.median(GC_skew(seq))] | |
return np.concatenate([trivial, | |
np.array(analysis.get_amino_acids_percent().values()), | |
hashed_kmers(seq), | |
nucleotide_repeats(seq, k=5)]) | |
def extract_features(genome, path): | |
print("Extracting features from {0} ...".format(path)) | |
chrom = os.path.basename(path) | |
data = (map(int, r) for r in csv.reader(open(path), delimiter=" ")) | |
features, labels = [], [] | |
for row in data: | |
start, end = row[:2] | |
features.append(genome.features(chrom, start, end)) | |
if len(row) == 3: | |
labels.append(row[-1]) | |
return np.array(features), np.array(labels) | |
def cutoff_bootstrap(features, labels, n=50): | |
freqs = fixes.Counter(labels) | |
seen = defaultdict(int) | |
cutoff = np.percentile(freqs.values(), 0.25) | |
print("Estimated cutoff: {0}.".format(cutoff)) | |
new_features, new_labels = [], [] | |
for f, l in itertools.izip(features, labels): | |
if freqs[l] < cutoff: | |
print("Dropping class {0}, freq = {1}.".format(l, freqs[l])) | |
continue | |
elif seen[l] == n: | |
continue | |
seen[l] += 1 | |
new_features.append(f) | |
new_labels.append(l) | |
return np.array(new_features), np.array(new_labels) | |
def get_classifier(genome, chroms, force=False, n_jobs=4): | |
pickled_clf = ".classifier" | |
pickled_features = "all_features.dump" | |
if not force and os.path.exists(pickled_clf): | |
print("Loading classifier from {0} ...".format(pickled_clf)) | |
clf = joblib.load(pickled_clf) | |
else: | |
# a) train | |
# clf = ensemble.RandomForestClassifier( | |
# min_samples_leaf=15, min_samples_split=120, compute_importances=True, | |
# n_estimators=30, n_jobs=4) | |
clf = ensemble.ExtraTreesClassifier( | |
min_samples_leaf=15, min_samples_split=60, | |
compute_importances=True, bootstrap=True, | |
n_estimators=30, n_jobs=n_jobs) | |
if os.path.exists(pickled_features): | |
print("Loading features from {0} ...".format(pickled_features)) | |
features, labels = joblib.load(pickled_features) | |
else: | |
features, labels = extract_all_features(genome, chroms, n_jobs=n_jobs) | |
joblib.dump((features, labels), pickled_features) | |
features_tr, features_te, labels_tr, labels_te = \ | |
train_test_split(features, labels, test_size=0.4) | |
print("Training {0} ...".format(clf)) | |
clf.fit(features, labels) | |
print("Done, saving params ...") | |
joblib.dump(clf, pickled_clf, compress=9) | |
print("Evaluating fit ...") | |
print("Test score: ", clf.score(features_te, labels_te)) | |
return clf | |
def extract_all_features(genome, chroms, n_jobs=4): | |
chunks = joblib.Parallel(n_jobs=n_jobs)( | |
joblib.delayed(extract_features)(genome, chrom) for chrom in chroms) | |
all_features, all_labels = [], [] | |
for features, labels in chunks: | |
all_features.append(features) | |
all_labels.append(labels) | |
features = np.concatenate(all_features) | |
labels = np.concatenate(all_labels) | |
print("Cleaning up training data ...") | |
features_tr, labels_tr = cutoff_bootstrap(features, labels, n=100) | |
print("Done.") | |
return features, labels | |
TIMESTAMP = time.time() | |
def predict_all(clf, chroms, n_jobs=4): | |
chunks = joblib.Parallel(n_jobs=n_jobs)( | |
joblib.delayed(extract_features)(genome, | |
os.path.join(predict_dir, chrom)) for chrom in chroms) | |
for chrom, (features, _labels) in itertools.izip(chroms, chunks): | |
predict(clf, chrom, features) | |
def predict(clf, chrom, features): | |
result_dir = "result-{0:.8n}".format(TIMESTAMP) | |
try: | |
os.mkdir(result_dir, 0o755) | |
except OSError as e: | |
if e.errno != errno.EEXIST: | |
raise | |
positions = [] | |
for f in features: | |
# HACK(Sergei): oopsy, abstraction leak. | |
start, end = map(int, f[:2]) | |
positions.append((start, end)) | |
print("Predicting {0} ...".format(chrom)) | |
labels = clf.predict(features) | |
with open(os.path.join(result_dir, chrom), "w") as result: | |
for (start, end), label in itertools.izip(positions, labels): | |
result.write("{0} {1} {2}\n".format(start, end, label)) | |
result.flush() | |
if __name__ == "__main__": | |
try: | |
root_dir, = sys.argv[1:] | |
except ValueError: | |
print("usage: {0} path/to/data/root".format(__file__)) | |
sys.exit(1) | |
else: | |
genome_dir, train_dir, predict_dir = \ | |
[os.path.join(root_dir, f) for f in ["genome", "train", "test"]] | |
genome = Genome(genome_dir) | |
chroms = os.listdir(train_dir) | |
clf = get_classifier(genome, | |
[os.path.join(train_dir, chrom) for chrom in chroms], | |
n_jobs=4) | |
predict_all(clf, chroms, n_jobs=8) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import csv | |
import heapq | |
import glob | |
import os.path | |
import sys | |
from collections import defaultdict | |
import sn | |
import numpy as np | |
from vcf import VCFReader | |
def buggy(f): | |
def inner(*args, **kwargs): | |
try: | |
return f(*args, **kwargs) | |
except Exception as e: | |
import pdb; pdb.set_trace() | |
return inner | |
# Task #1 | |
# Используя ... определите вероятность того, что вы американец (AMR), | |
# азиат (ASN), африканец (AFR), европеец (EUR). Должно получиться 4 | |
# значения вероятностей. | |
def race(records): | |
race_freqs = np.array([.25, .25, .25, .25], np.float64) | |
races, eps = ["AMR_AF", "ASN_AF", "AFR_AF", "EUR_AF"], 10e-8 | |
for record in records.itervalues(): | |
snp_freq = np.array( | |
[record.INFO.get(race_id, eps) for race_id in races], | |
np.float64) | |
race_freqs *= snp_freq * record.QUAL | |
race_freqs /= sum(race_freqs) | |
return race_freqs | |
# Task #2 | |
# Используя данные ~152 генотипов из http://opensnp.org/dump_download | |
# определите 10 генетически ближайших к вам индивидуумов (например, по | |
# количеству совпадающих гаплотипов в снипах). | |
@buggy | |
def neighbours(snps, records, candidates, n=10): | |
def rank(candidate): | |
try: | |
snps = sn.parse(candidate) | |
except sn.SNPyError: | |
return -1 | |
total = 0. | |
for snp in snps: | |
for g in snp.genotype: | |
# Similar to 'lookup_snps' we treat alleles independantly. | |
if g in (snp.name, g) not in known: | |
continue | |
total += records[snp.name].INFO.get("AF", 0.) | |
return total | |
# *Both* alleles shoudl match! | |
known = set((snp.id, snp.genotype) for snp in snps) | |
return sorted(heapq.nlargest(n, candidates, rank)) | |
# Task #3 | |
# Используя 10 ближайших "родственников" из пункта 2, определите свой | |
# фенотип (например, взяв мажорирующие признаки из фенотипов | |
# "родственников"). | |
@buggy | |
def phenotype(neighbours, traits): | |
traits = dict((p["user_id"], p) | |
for p in csv.DictReader(open(traits), delimiter=";")) | |
def inner(): | |
acc = defaultdict(lambda: defaultdict(int)) | |
for neighbour in map(os.path.basename, neighbours): | |
user_id, _ = neighbour.split("_", 1) | |
user_id = user_id[4:] # len("user") == 4 | |
if user_id not in traits: | |
continue | |
for trait, option in traits[user_id].iteritems(): | |
if option == "-" or trait in ["user_id", "date_of_birth"]: | |
continue | |
acc[trait][option] += 1 | |
for trait, options in acc.iteritems(): | |
yield trait, max(options, key=lambda o: options[o]) | |
return sorted(inner()) | |
def lookup_snps(snps, dbSNP): | |
# Unfortuntately, dbSNP doesn't provide heterozygosity information, | |
# (at least in the VCF format), so we treat two alleles | |
# independantly. | |
known = set((snp.name, g) for snp in snps for g in snp.genotype) | |
resolved = {} | |
for record in VCFReader(open(dbSNP)): | |
if not record.is_snp: | |
continue | |
if all((record.ID, str(alt)) not in known for alt in record.ALT): | |
continue | |
resolved[record.ID] = record | |
return resolved | |
if __name__ == "__main__": | |
try: | |
dbSNP, opensnp, someone = sys.argv[1:] | |
except ValueError: | |
sys.exit("{0} dbSNP OPENSNP SOMEONE".format(__file__)) | |
print("Looking up sample SNPs in dbSNP ...") | |
snps = sn.parse(someone) | |
records = lookup_snps(snps, dbSNP) | |
print("Done.") | |
print("Task #1: race probabilities") | |
print(race(records)) | |
print("Task #2: opensnp neighbours") | |
found = neighbours(snps, | |
records, | |
glob.glob(os.path.join(opensnp, "user*.txt"))) | |
print(found) | |
print("Task #3: phenotype") | |
[phenotypes] = glob.glob(os.path.join(opensnp, "phenotypes_*.csv")) | |
print(phenotype(found, phenotypes)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment