-
-
Save ElDeveloper/31d95bf37ac5aed8070f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import division | |
from sys import path, argv | |
from cogent.format.mage import MagePoint, MageList, MageGroup, SimplexHeader, \ | |
Kinemage, MatchedColors, CartesianSimplexHeader, RainbowColors | |
from cogent.maths.stats.util import Numbers, Freqs | |
from cogent.maths.stats.test import multiple_comparisons | |
from cogent.maths.stats.test import combinations | |
from cogent.util.misc import flatten | |
from cogent.maths.geometry import center_of_mass | |
from Motif import Motif | |
from numpy import average, dot, array, matrix, transpose, cumsum | |
from numpy import cov as calc_cov | |
from numpy.linalg import inv, eig, svd | |
from math import sqrt | |
from functions import combinations, unique_combinations | |
from gmpy import mpf | |
from cogent.parse.fasta import MinimalFastaParser | |
from scipy.linalg import sqrtm | |
from probdata import composition | |
import random | |
import cPickle | |
WC = {('U','A'):1, ('A','U'):1, ('G','C'):1, ('C','G'):1} | |
GU = WC.copy() | |
GU.update({('G','U'):1, ('U','G'):1}) | |
categories = {'Bourdeau_ATP_bindingB': 'Nucleotide', | |
'Bourdeau_FMNBinding_A_1999': 'Nucleotide', | |
'Bourdeau_FMNBinding_B_1999': 'Nucleotide', | |
'Bourdeau_Leadzyme_A': 'misc', | |
'Bourdeau_Leadzyme_B': 'misc', | |
'Bourdeau_Neomycin_Binding_1999': 'Antibiotic', | |
'Bourdeau_ParomomycinA_1999': 'Antibiotic', | |
'Bourdeau_ParomomycinB_1999': 'Antibiotic', | |
'Bourdeau_Theophylline_bindingA': 'Nucleotide', | |
'Bourdeau_Theophylline_bindingB': 'Nucleotide', | |
'Bourdeau_UV_Loop_A': 'misc', | |
'Bourdeau_UV_Loop_B': 'misc', | |
'Davis_Alignment_of_class5': 'Nucleotide', | |
'Famulok_Arginine_Revised': 'AminoAcid', | |
'HammerHead_Minimal': 'SelfCleaving', | |
'Huang_ATPAptamerA': 'Nucleotide', | |
'Huang_ATPAptamerB': 'Nucleotide', | |
'Illangasekare_Self_Aminoacylation': 'AminoAcid', | |
'Laserson_Chloramphenicol': 'Antibiotic', | |
'Laserson_NeomycinB': 'Antibiotic', | |
'Laserson_Streptomycin': 'Antibiotic', | |
'Lazarev_xmotif_ribozyme': 'SelfCleaving', | |
'Legiewicz_Isoleucine6B': 'AminoAcid', | |
'Legiewicz_Isoleucine_6A': 'AminoAcid', | |
'Majerfeld_Histidine_gap0': 'AminoAcid', | |
'Majorfield_L-tryptophan_605': 'AminoAcid', | |
'Majorfield_L-tryptophan_CYA': 'AminoAcid', | |
'Bourdeau_Valine_bindingB': 'AminoAcid', | |
'Sazani_ATP_1-1Min': 'Nucleotide', | |
'Tryptophan': 'AminoAcid', | |
'Yarus_FAD_Binding': 'Nucleotide', | |
'Yarus_Isoleucine_Strict': 'AminoAcid', | |
'Zn_Binding': 'misc'} | |
#transformations of the radius for different calculations | |
RAD_TRANSFORMS = { | |
'Matches': lambda x: (x * 0.1), | |
'Probs': lambda x: ((x**0.33) * 30), | |
'Products': lambda x: ((x ** 0.33) * 20), | |
} | |
def four_sf(t): | |
return tuple("%0.4e"%v for v in t) | |
def spacer_partitions(s, m): | |
"""Returns number of ways of placing m modules in longer spacer s.""" | |
return combinations(s+1, m) | |
def make_pairs(freqs): | |
"""takes freq distribution and makes pair distribution""" | |
result = {} | |
for i in freqs: | |
curr = freqs[i] | |
for j in freqs: | |
result[(i, j)] = curr * freqs[j] | |
return result | |
def item_sum(freqs, items): | |
"""returns sum of items from freqs""" | |
return sum([freqs[i] for i in items]) | |
def TaskRecordParser(infile_fold, infile_seq): | |
"""Treats inputfile as sequence of TaskRecords. Returns a list for each sequence-length.""" | |
motifs = [] | |
fold_lines = infile_fold.readlines() | |
seq_lines = infile_seq.readlines() | |
for i in reversed(range(len(fold_lines))): | |
if len(fold_lines[i].strip()) == 0: | |
del fold_lines[i] | |
for i in reversed(range(len(seq_lines))): | |
if len(seq_lines[i].strip()) == 0: | |
del seq_lines[i] | |
for line1, line2 in zip(fold_lines, seq_lines): | |
line_split1 = line1.split('\t') | |
line_split2 = line2.split('\t') | |
line_split1 = line_split1[:-1] #get rid of the time at the end | |
line_split2 = line_split2[:-1] | |
while line_split1.count('') > 0: #get rid of empty strings cause by repeated tabs | |
line_split1.remove('') | |
while line_split2.count('') > 0: | |
line_split2.remove('') | |
#make sure they refer to the same motif/composition/GU | |
comp1 = Freqs(dict(zip('ACGU', map(float, line_split1[2].split(','))))) | |
comp2 = Freqs(dict(zip('ACGU', map(float, line_split2[2].split(','))))) | |
if line_split1[0] != line_split2[0] or \ | |
line_split1[3] != line_split2[3] or \ | |
abs(comp1['A']-comp2['A']) > 0.0001 or \ | |
abs(comp1['C']-comp2['C']) > 0.0001 or \ | |
abs(comp1['G']-comp2['G']) > 0.0001 or \ | |
abs(comp1['U']-comp2['U']) > 0.0001: | |
print 'ERROR: folding and sequence probability files do not match line-by-line.:\n'+\ | |
'line from folding:\n'+'\t'.join(line_split1)+'\nline from sequence:\n'+'\t'.join(line_split2) | |
return | |
for i in range(int(len(line_split1)/6)): #base it on number of lengths of folding probs | |
if len(motifs) < i+1: | |
motifs.append([]) | |
name = line_split1[6*i] | |
comp = Freqs(dict(zip('ACGU', map(float, line_split1[6*i+2].split(','))))) | |
gu = 1 if line_split1[3].lower()=='yes' else 0 | |
length = int(line_split1[6*i+4]) | |
count, numtodo = map(int, line_split1[6*i+5].strip().split('/')) | |
if length == 50: | |
seq = float(line_split2[6*0+5]) | |
elif length == 100: | |
seq = float(line_split2[6*1+5]) | |
elif length == 150: | |
seq = float(line_split2[6*2+5]) | |
motifs[i].append(TaskRecord(name, comp, gu, length, count, numtodo, seq)) | |
len1 = len(motifs[0]) | |
for m in motifs: | |
if len(m) != len1: | |
print "ERROR: not same number of points for each length\n" | |
return | |
return motifs | |
class TaskRecord(object): | |
"""Holds motif name, composition, GU, length, match freq""" | |
def __init__(self, Name, Composition, GU, Length, Count, NumToDo, Sequence): | |
"""initializer""" | |
self.Name = Name | |
self.Composition = Composition | |
self.GU = GU | |
self.Length = Length | |
self.Count = Count | |
self.NumToDo = NumToDo | |
self.Sequence = Sequence | |
def __str__(self): | |
"""Writes data back out in same format""" | |
pieces = [] | |
pieces.append(self.Name) | |
composition = [] | |
for base in 'ACGU': | |
composition.append(str(self.Composition[base])) | |
pieces.append(','.join(composition)) | |
pieces.append(str(self.GU)) | |
pieces.append(str(self.Length)) | |
pieces.append('/'.join([str(self.Count), str(self.NumToDo)])) | |
return '\t'.join(pieces) | |
def test_parser(infile): | |
"""Verify that parsed records are same as original.""" | |
for line in infile: | |
print 'ORIG:', line.strip() | |
curr = TaskRecord(line) | |
print 'NEW: ', str(curr) | |
class MotifStats(object): | |
"""Holds statistics for a particular motif""" | |
def __init__(self): | |
"""Returns a new motif with specified BaseCounts and Length""" | |
self.Sequence = None | |
self.Matches = Numbers() | |
self.Probs = Numbers() | |
self.Products = Numbers() | |
self.Coordinates = [] | |
def __str__(self): | |
"""Prints summary statistics""" | |
lines = [] | |
for attr in ['Matches', 'Probs', 'Products']: | |
curr = ['%s:' % attr] | |
nums = getattr(self, attr) | |
curr.append('Mean:') | |
curr.append(nums.Mean) | |
curr.append('Min:') | |
curr.append(min(nums)) | |
curr.append('Max:') | |
curr.append(max(nums)) | |
lines.append(' '.join(map(str, curr))) | |
return '\n'.join(lines) | |
def poisson_single(self, allow_gu, composition): | |
"""returns Poisson match probability in single trial.""" | |
basecounts = self.BaseCounts | |
if allow_gu: | |
pairs = GU | |
else: | |
pairs = WC | |
pair_prob = item_sum(make_pairs(composition), pairs) | |
prob = 1 | |
for base in 'UCAG': | |
prob *= composition[base]**basecounts[base] | |
prob *= pair_prob**basecounts['P'] | |
return prob | |
def poisson(self, allow_gu, composition, length): | |
"""returns Poisson match probability for single sequence.""" | |
single = self.poisson_single(allow_gu, composition) | |
num_trials = spacer_partitions(length - self.Length, self.Pieces) | |
assert single != 0 | |
return multiple_comparisons(single, num_trials) | |
def get_statistics(records): | |
"""Get summary statistics. Returns a list with one motif dict per sequence length.""" | |
motifs = [dict() for r in records] | |
for m in range(len(records)): | |
allow_gu_string = "_GU" if records[m][0].GU == 1 else "_noGU" | |
motifs[m][records[m][0].Name+allow_gu_string+"_"+str(records[m][0].Length)] = MotifStats() | |
#sort records by coordinates first so we can compare between runs | |
coords = 'ACG' | |
temp_recs = [([r.Composition[i] for i in coords], r) for r in records[m]] | |
temp_recs.sort() | |
records[m] = [r[1] for r in temp_recs] | |
for r in records[m]: | |
curr = motifs[m][r.Name+allow_gu_string+"_"+str(r.Length)] | |
curr.Matches.append(r.Count/float(r.NumToDo)) | |
allow_gu = True if r.GU == 1 else False | |
curr.Probs.append(r.Sequence) | |
curr.Products.append(curr.Matches[-1] * curr.Probs[-1]) | |
curr.Coordinates.append((r.Composition['A'],r.Composition['C'], | |
r.Composition['G'])) | |
#print r.Name+allow_gu_string+"_"+str(r.Length), curr.Coordinates[-1], curr.Matches[-1], curr.Probs[-1], curr.Products[-1] | |
return motifs | |
def stats2magelist(x, attr): | |
return [MagePoint(coord, Radius=prod) \ | |
#for coord, prod in zip(x.Coordinates, x.Products)] | |
for coord, prod in zip(x.Coordinates, getattr(x, attr))], zip(x.Coordinates, getattr(x, attr)) | |
def subdivide(x,y,z): | |
#look at x-y edge | |
xy = [(x[0]*2/3.0+y[0]/3.0, x[1]*2/3.0+y[1]/3.0, x[2]*2/3.0+y[2]/3.0), (x[0]/3.0+y[0]*2/3.0, x[1]/3.0+y[1]*2/3.0, x[2]/3.0+y[2]*2/3.0)] | |
#pull them to the surface of the sphere | |
xy = [(i[0]/sqrt(i[0]**2+i[1]**2+i[2]**2), i[1]/sqrt(i[0]**2+i[1]**2+i[2]**2), i[2]/sqrt(i[0]**2+i[1]**2+i[2]**2)) for i in xy] | |
#do the same for the other edges | |
xz = [(x[0]*2/3.0+z[0]/3.0, x[1]*2/3.0+z[1]/3.0, x[2]*2/3.0+z[2]/3.0), (x[0]/3.0+z[0]*2/3.0, x[1]/3.0+z[1]*2/3.0, x[2]/3.0+z[2]*2/3.0)] | |
xz = [(i[0]/sqrt(i[0]**2+i[1]**2+i[2]**2), i[1]/sqrt(i[0]**2+i[1]**2+i[2]**2), i[2]/sqrt(i[0]**2+i[1]**2+i[2]**2)) for i in xz] | |
zy = [(z[0]*2/3.0+y[0]/3.0, z[1]*2/3.0+y[1]/3.0, z[2]*2/3.0+y[2]/3.0), (z[0]/3.0+y[0]*2/3.0, z[1]/3.0+y[1]*2/3.0, z[2]/3.0+y[2]*2/3.0)] | |
zy = [(i[0]/sqrt(i[0]**2+i[1]**2+i[2]**2), i[1]/sqrt(i[0]**2+i[1]**2+i[2]**2), i[2]/sqrt(i[0]**2+i[1]**2+i[2]**2)) for i in zy] | |
center = ((x[0]+y[0]+z[0])/3.0, (x[1]+y[1]+z[1])/3.0, (x[2]+y[2]+z[2])/3.0) | |
center_len = sqrt(center[0]**2+center[1]**2+center[2]**2) | |
center = (center[0]/center_len, center[1]/center_len, center[2]/center_len) | |
#generate the new list of faces | |
faces = [(x,xz[0],xy[0]), (xz[0],xy[0],center), (xz[0],xz[1],center), (xz[1],zy[0],center), (xz[1],z,zy[0]), (xy[0],center,xy[1]), | |
(xy[1],y,zy[1]), (xy[1],zy[1],center), (zy[1],zy[0],center)] | |
return faces | |
def stats2magelistgaussbound(x, attr, dist, ellipsoid_count = None, sums = None, maxes = None): | |
std_devs = 1 | |
total = sum(getattr(x,attr)) | |
prods = getattr(x,attr) | |
mu = [sum(x.Coordinates[i][j]*prods[i] for i in range(len(x.Coordinates)))/float(total) for j in range(3)] | |
cov = [[0,0,0],[0,0,0],[0,0,0]] | |
for i in range(3): | |
for j in range(3): | |
cov[i][j] = sum((x.Coordinates[n][i]-mu[i])*(x.Coordinates[n][j]-mu[j])*prods[n] for n in range(len(x.Coordinates)))/float(total) | |
cov2 = cov #used for output | |
#determine which points are inside of this ellipsoid and update appropriate 3D matrix | |
if ellipsoid_count: | |
if attr == "Matches": | |
which_mat = 0 | |
elif attr == "Probs": | |
which_mat = 1 | |
else: | |
which_mat = 2 | |
for comp in composition(): | |
#is this point in the ellipsoid? Compute the mahalanobis distance from mu | |
#print matrix([comp[3]+comp[1], comp[3]+comp[2], comp[3]+comp[0]), matrix(mu) | |
x_minus_mu = matrix(comp[:3]) - matrix(mu) | |
dist = sqrt(x_minus_mu*inv(matrix(cov))*x_minus_mu.transpose()) | |
if dist <= std_devs: | |
#it's in the ellipsoid | |
ellipsoid_count[which_mat][four_sf(tuple(comp[:3]))] += 1 | |
if sums: | |
if attr == "Matches": | |
which_mat = 0 | |
elif attr == "Probs": | |
which_mat = 1 | |
else: | |
which_mat = 2 | |
for i in range(len(x.Coordinates)): | |
key = four_sf(tuple(x.Coordinates[i])) | |
sums[which_mat][key] += prods[i] | |
lolercoaster() | |
if maxes: | |
if attr == "Matches": | |
which_mat = 0 | |
elif attr == "Probs": | |
which_mat = 1 | |
else: | |
which_mat = 2 | |
for i in range(len(x.Coordinates)): | |
key = four_sf(tuple(x.Coordinates[i])) | |
maxes[which_mat][key] = max(maxes[which_mat][key], prods[i]) | |
cov = list(list(float(std_devs*j) for j in i) for i in sqrtm(cov)) #for the linear transformation to be correct visually | |
#begin with a regular icosahedron | |
t = (1+sqrt(5))/2.0 | |
s = sqrt(1+t**2) | |
vertices = [(t/s,1/s,0), (-t/s,1/s,0), (t/s,-1/s,0), (-t/s,-1/s,0), (1/s,0,t/s), (1/s,0,-t/s), (-1/s,0,t/s),(-1/s,0,-t/s), | |
(0,t/s,1/s), (0,-t/s,1/s), (0,t/s,-1/s), (0,-t/s,-1/s)] | |
v = vertices | |
faces = [(v[0],v[8],v[4]),(v[1],v[10],v[7]),(v[2],v[9],v[11]),(v[7],v[3],v[1]),(v[0],v[5],v[10]),(v[3],v[9],v[6]), | |
(v[3],v[11],v[9]),(v[8],v[6],v[4]),(v[2],v[4],v[9]),(v[3],v[7],v[11]),(v[4],v[2],v[0]), | |
(v[9],v[4],v[6]),(v[2],v[11],v[5]),(v[0],v[10],v[8]),(v[5],v[0],v[2]),(v[10],v[5],v[7]),(v[1],v[6],v[8]), | |
(v[1],v[8],v[10]),(v[6],v[1],v[3]),(v[11],v[7],v[5])] | |
#subdivide each of the faces into 9 faces | |
new_faces = [] | |
for face in faces: | |
new_faces.extend(subdivide(face[0], face[1], face[2])) | |
faces = new_faces | |
#apply linear transformation | |
#faces = [(tuple(dot(array(cov),array(v[0]))), tuple(dot(array(cov),array(v[1]))), tuple(dot(array(cov),array(v[2])))) for v in faces] | |
#do it manually to avoid numerical errors | |
faces = [(tuple(multmat(cov,v[0])), tuple(multmat(cov,v[1])), tuple(multmat(cov,v[2]))) for v in faces] | |
#change from variance to std dev by making length of vector the square root of what it currently is | |
#faces = [((float(((mpf(i[0][0])**2+mpf(i[0][1])**2+mpf(i[0][2])**2).sqrt()).sqrt()*mpf(i[0][0])/(mpf(i[0][0])**2+mpf(i[0][1])**2+mpf(i[0][2])**2).sqrt()), | |
# float(((mpf(i[0][0])**2+mpf(i[0][1])**2+mpf(i[0][2])**2).sqrt()).sqrt()*mpf(i[0][1])/(mpf(i[0][0])**2+mpf(i[0][1])**2+mpf(i[0][2])**2).sqrt()), | |
# float(((mpf(i[0][0])**2+mpf(i[0][1])**2+mpf(i[0][2])**2).sqrt()).sqrt()*mpf(i[0][2])/(mpf(i[0][0])**2+mpf(i[0][1])**2+mpf(i[0][2])**2).sqrt())), | |
# | |
# (float(((mpf(i[1][0])**2+mpf(i[1][1])**2+mpf(i[1][2])**2).sqrt()).sqrt()*mpf(i[1][0])/(mpf(i[1][0])**2+mpf(i[1][1])**2+mpf(i[1][2])**2).sqrt()), | |
# float(((mpf(i[1][0])**2+mpf(i[1][1])**2+mpf(i[1][2])**2).sqrt()).sqrt()*mpf(i[1][1])/(mpf(i[1][0])**2+mpf(i[1][1])**2+mpf(i[1][2])**2).sqrt()), | |
# float(((mpf(i[1][0])**2+mpf(i[1][1])**2+mpf(i[1][2])**2).sqrt()).sqrt()*mpf(i[1][2])/(mpf(i[1][0])**2+mpf(i[1][1])**2+mpf(i[1][2])**2).sqrt())), | |
# | |
# (float(((mpf(i[2][0])**2+mpf(i[2][1])**2+mpf(i[2][2])**2).sqrt()).sqrt()*mpf(i[2][0])/(mpf(i[2][0])**2+mpf(i[2][1])**2+mpf(i[2][2])**2).sqrt()), | |
# float(((mpf(i[2][0])**2+mpf(i[2][1])**2+mpf(i[2][2])**2).sqrt()).sqrt()*mpf(i[2][1])/(mpf(i[2][0])**2+mpf(i[2][1])**2+mpf(i[2][2])**2).sqrt()), | |
# float(((mpf(i[2][0])**2+mpf(i[2][1])**2+mpf(i[2][2])**2).sqrt()).sqrt()*mpf(i[2][2])/(mpf(i[2][0])**2+mpf(i[2][1])**2+mpf(i[2][2])**2).sqrt()))) for i in faces] | |
#center at mu | |
faces = [((v[0][0]+mu[0], v[0][1]+mu[1], v[0][2]+mu[2]),(v[1][0]+mu[0], v[1][1]+mu[1], v[1][2]+mu[2]), | |
(v[2][0]+mu[0], v[2][1]+mu[1], v[2][2]+mu[2])) for v in faces] | |
faces = [(MagePoint(v[0], Radius=0.0, State='X'),MagePoint(v[1], Radius=0.0),MagePoint(v[2], Radius=0.0)) for v in faces] | |
return faces, mu, cov2 | |
def multmat(m,v): | |
m = [[mpf(m[i][j]) for j in range(len(m[i]))] for i in range(len(m))] | |
v = [mpf(v[i]) for i in range(len(v))] | |
result = [m[0][0]*v[0]+m[0][1]*v[1]+m[0][2]*v[2], | |
m[1][0]*v[0]+m[1][1]*v[1]+m[1][2]*v[2], | |
m[2][0]*v[0]+m[2][1]*v[1]+m[2][2]*v[2]] | |
return [float(i) for i in result] | |
def get_summary_stats(motifs): | |
"""Will produce list keyed by composition, with entries for each motif.""" | |
results = {} | |
motif_names = [] #will only store names of motifs that exist | |
for name, motif in motifs.items(): | |
#skip the motif if there wasn't any data; otherwise, assume it's | |
#complete (will raise KeyError otherwise) | |
if not motif.Matches: | |
continue | |
for c, p in zip(motif.Coordinates, motif.Products): | |
curr = tuple([int(round(i*100, 0)) for i in c]) | |
if curr not in results: | |
results[curr] = {} | |
results[curr][name] = p | |
motif_names.append(name) | |
comps = results.keys() | |
comps.sort() | |
lines = [] | |
lines.append("A\tC\tG\tU\t" + '\t'.join(motif_names)) | |
for c in comps: | |
line = ["%d\t%d\t%d\t%d" % (c[0], c[1], c[2], 100-(c[0]+c[1]+c[2]))] | |
for n in motif_names: | |
line.append("%0.3e" % results[c][n]) | |
lines.append('\t'.join(line)) | |
return lines | |
def find_n_best_probs(motifs, n=5, sort_by='Products'): | |
"""Returns the coordinates for the n best probabilities for each motif.""" | |
results = {} | |
for name, motif in motifs.items(): | |
if not motif.Matches: | |
continue | |
by_prob = zip(getattr(motif, sort_by), motif.Products, \ | |
motif.Coordinates, motif.Matches, motif.Probs) | |
by_prob.sort() | |
by_prob.reverse() | |
results[name] = [p[1:] for p in by_prob[0:n]] | |
return results | |
class MageColorList(object): | |
Colors = flatten(MatchedColors) | |
def __init__(self): | |
self._index = 0 | |
def next(self): | |
"""Returns the next color.""" | |
try: | |
color = self.Colors[self._index] | |
except IndexError: | |
self._index = 0 | |
color = self.Colors[self._index] | |
self._index += 1 | |
return color | |
def make_simplex_old(motifs, attr, rad_transform=None): | |
if rad_transform is None: | |
rad_transform = RAD_TRANSFORMS[attr] | |
#make the simplex | |
header = SimplexHeader() | |
lists = [] | |
color = MageColorList() | |
for name, data in motifs.items(): | |
if data.Matches: | |
l, m = stats2magelist(data, attr) | |
#o.write(attr+'\t'+name+'\t'+str(m)) | |
lists.append(MageList(l, name, \ | |
Color=color.next())) | |
#correct the probs by applying arbitrary rad_transform function | |
for curr_list in lists: | |
for point in curr_list: | |
point.Radius = rad_transform(point.Radius) | |
group = MageGroup(lists, Style='ball') | |
simplex = Kinemage(Count=1, Header=header, Groups=[group]) | |
return simplex | |
def make_simplex(motifs, attr, max_radius=0.01, style = 'points', maxes = None, sums = None): | |
#make the simplex | |
header = CartesianSimplexHeader() | |
lists = {'Nucleotide': [], 'misc': [], 'Antibiotic':[], 'AminoAcid': [], 'SelfCleaving': []} #one for each category | |
color = MageColorList() | |
o = open('means.txt', 'a') | |
if attr == 'Products': | |
for name, data in sorted(motifs.items()): | |
p = open('./combineddata/'+name+'.txt', 'w') | |
cPickle.dump([(c, d) for c,d in zip(data.Coordinates, data.Products)], p) | |
p.close() | |
if style == 'points': | |
#o.write('\n\n' + attr + '\n-----------------------\n\n') | |
for name, data in sorted(motifs.items()): | |
#print "Points: ", name | |
if data.Matches: | |
name2 = name[:name[:name.rindex('_')].rindex('_')] | |
if categories[name2] == 'Nucleotide': | |
thecolor = 'red' | |
elif categories[name2] == 'misc': | |
thecolor = 'green' | |
elif categories[name2] == 'Antibiotic': | |
thecolor = 'blue' | |
elif categories[name2] == 'AminoAcid': | |
thecolor = 'yellow' | |
elif categories[name2] == 'SelfCleaving': | |
thecolor = 'magenta' | |
else: | |
raise ValueError, "ERROR: Not a valid category for "+name2 | |
l,m = stats2magelist(data,attr) | |
#o.write(name+'\t'+str(m)+'\n') | |
lists[categories[name2]].append(MageList(l, name+'_points', \ | |
Color=thecolor+', alpha=0.7', Off=False, Style='ball')) | |
#set the radius in each list independently | |
for cg in ['Nucleotide', 'misc', 'Antibiotic', 'AminoAcid', 'SelfCleaving']: | |
for l in lists[cg]: | |
biggest = max([p.Radius for p in l.iterPoints()])**0.33 | |
for p in reversed(range(len(l))):#l.iterPoints(): | |
if biggest <= 0 or l[p].Radius <= 0: | |
del l[p] | |
else: | |
l[p].Radius = (l[p].Radius**0.33) * max_radius/biggest | |
else: #shape | |
#initialize in_elliposoids and sums | |
t = dict() | |
for c in composition(): | |
t[four_sf(tuple(c[:3]))] = 0 | |
ellipsoid_count = {'Nucleotide': [dict(t), dict(t), dict(t)], | |
'misc': [dict(t), dict(t), dict(t)], 'Antibiotic':[dict(t), dict(t), dict(t)], | |
'AminoAcid': [dict(t), dict(t), dict(t)], 'SelfCleaving': [dict(t), dict(t), dict(t)]} | |
o.write('\n\n' + attr + '\n-----------------------\n\n') | |
for name, data in sorted(motifs.items()): | |
#create a new list | |
if data.Matches and max(data.Products) > 0: | |
#print "Shape: ", name | |
name2 = name[:name[:name.rindex('_')].rindex('_')] | |
faces, mu, cov = stats2magelistgaussbound(data, attr,0.3, ellipsoid_count = ellipsoid_count[categories[name2]], sums = sums[categories[name2]], | |
maxes = maxes[categories[name2]]) | |
o.write(name + '\t' + str(mu) +'\t' + str(cov) + '\n') | |
new_faces = [] | |
for face in faces: | |
new_faces.extend(list(face)) | |
name2 = name[:name[:name.rindex('_')].rindex('_')] | |
if categories[name2] == 'Nucleotide': | |
thecolor = 'red' | |
elif categories[name2] == 'misc': | |
thecolor = 'green' | |
elif categories[name2] == 'Antibiotic': | |
thecolor = 'blue' | |
elif categories[name2] == 'AminoAcid': | |
thecolor = 'yellow' | |
elif categories[name2] == 'SelfCleaving': | |
thecolor = 'white' | |
else: | |
raise ValueError, "ERROR: Not a valid category for "+name2 | |
name2 = name[:name[:name.rindex('_')].rindex('_')] | |
lists[categories[name2]].append(MageList(new_faces, name+"_shape", Color = thecolor +', alpha=0.1 ', Off=False, Style='Triangle')) | |
#append the in_ellipsoids list | |
for i in ['Nucleotide', 'misc', 'Antibiotic', 'AminoAcid', 'SelfCleaving']: | |
if i == 'Nucleotide': | |
thecolor = 'red' | |
elif i == 'misc': | |
thecolor = 'green' | |
elif i == 'Antibiotic': | |
thecolor = 'blue' | |
elif i == 'AminoAcid': | |
thecolor = 'yellow' | |
elif i == 'SelfCleaving': | |
thecolor = 'magenta' | |
if attr == "Matches": | |
which_mat = 0 | |
elif attr == "Probs": | |
which_mat = 1 | |
else: | |
which_mat = 2 | |
mage_list = [[],[]] | |
max_value = max([j[1] for j in ellipsoid_count[i][which_mat].iteritems()]) | |
for coord, value in ellipsoid_count[i][which_mat].iteritems(): | |
coord = tuple(float(i) for i in coord) | |
if value > 0: | |
point = MagePoint(coord, Radius=value/float(50*7), Color=thecolor) | |
mage_list[0].append(point) | |
point = MagePoint(coord, Radius=value/float(50*max_value), Color=thecolor) | |
mage_list[1].append(point) | |
ellipsoid_list = MageList(mage_list[0], "in_ellipsoids_absolute", Off=False, Style='ball'+', alpha=0.7 ') | |
lists[i].append(ellipsoid_list) | |
ellipsoid_list = MageList(mage_list[1], "in_ellipsoids_relative", Off=True, Style='ball'+', alpha=0.7 ') | |
lists[i].append(ellipsoid_list) | |
mage_list = [] | |
biggest = max([j[1] for j in sums[i][which_mat].iteritems()])**0.88 | |
for coord, value in sums[i][which_mat].iteritems(): | |
coord = tuple(float(i) for i in coord) | |
if value > 0: | |
point = MagePoint(coord, Radius=(value**0.88) * 2*max_radius/biggest, Color='peach') | |
mage_list.append(point) | |
sums_list = MageList(mage_list, "sum", Off=True, Style='ball'+', alpha=0.7 ') | |
lists[i].append(sums_list) | |
#create a grand-total ellipsoid count | |
ellipsoid_count_total = [dict(t), dict(t), dict(t)] | |
for y in range(3): #Matches, Probs and Prods | |
for c in composition(): | |
#get the total | |
total = 0 | |
for func in ['Nucleotide', 'misc', 'Antibiotic', 'AminoAcid', 'SelfCleaving']: | |
total += ellipsoid_count[func][y][four_sf(tuple(c[:3]))] | |
ellipsoid_count_total[y][tuple(c[:3])] = total | |
ellipsoid_list1 = None | |
ellipsoid_list2 = None | |
if attr == "Matches": | |
which_mat = 0 | |
elif attr == "Probs": | |
which_mat = 1 | |
else: | |
which_mat = 2 | |
num_ellipsoids = len(motifs.items()) | |
print num_ellipsoids | |
mage_list = [[],[]] | |
max_value = max([i[1] for i in ellipsoid_count_total[which_mat].iteritems()]) | |
for coord, value in ellipsoid_count_total[which_mat].iteritems(): | |
coord = tuple(float(i) for i in coord) | |
if value > 0: | |
point = MagePoint(coord, Radius=value/float(50*7), Color="hsv"+str(value)) | |
mage_list[0].append(point) | |
point = MagePoint(coord, Radius=value/float(50*max_value), Color="hsv"+str(int(value*(20/float(max_value))))) | |
mage_list[1].append(point) | |
ellipsoid_list1 = MageList(mage_list[0], "in_ellipsoids_absolute", Off=False, Style='ball'+', alpha=0.7 ') | |
ellipsoid_list2 = MageList(mage_list[1], "in_ellipsoids_relative", Off=False, Style='ball'+', alpha=0.7 ') | |
#create a grand-total sum | |
sums_total = [dict(t), dict(t), dict(t)] | |
for y in range(3): #Matches, Probs and Prods | |
for c in composition(): | |
#get the total | |
total = 0 | |
for func in ['Nucleotide', 'misc', 'Antibiotic', 'AminoAcid', 'SelfCleaving']: | |
total += sums[func][y][four_sf(tuple(c[:3]))] | |
sums_total[y][four_sf(tuple(c[:3]))] = total | |
ofile = open('./sumsdata/'+attr+motifs.items()[0][0]+'.txt','w') | |
cPickle.dump(sums_total, ofile) | |
ofile.close() | |
sums_list = None | |
if attr == "Matches": | |
which_mat = 0 | |
elif attr == "Probs": | |
which_mat = 1 | |
else: | |
which_mat = 2 | |
mage_list = [] | |
biggest = max([j[1] for j in sums_total[which_mat].iteritems()])**0.88 | |
for coord, value in sums_total[which_mat].iteritems(): | |
coord = tuple(float(i) for i in coord) | |
if value > 0: | |
point = MagePoint(coord, Radius=(value**0.88) * 2*max_radius/biggest, Color='peach') | |
mage_list.append(point) | |
sums_list = MageList(mage_list, "sum", Off=False, Style='ball'+', alpha=0.7 ') | |
groups = [] | |
for i in ['Nucleotide', 'misc', 'Antibiotic', 'AminoAcid', 'SelfCleaving']: | |
groups.append(MageGroup(lists[i], i, Off=True)) | |
if style=='shape': | |
groups.append(MageGroup([ellipsoid_list1], 'in_ellipsoids_absolute', Off=True)) | |
groups.append(MageGroup([ellipsoid_list2], 'in_ellipsoids_relative', Off=True)) | |
groups.append(MageGroup([sums_list], 'sum', Off=True)) | |
group = MageGroup(groups, Off=True) | |
o.close() | |
simplex = Kinemage(Count=1, Header=header, Groups=[group]) | |
return simplex.toCartesian() | |
if __name__ == '__main__': | |
"""Run script: read lines from file, find maximum, colorize, -> MAGE""" | |
from sys import exit | |
#print usage if no argument | |
if len(argv) == 1: | |
print """Usage: tasks2simplex filename task | |
task should be one of "simplex", "summary", or "best" (the default). | |
If task is simplex, specify output as "Matches", "Probs", or "Products". | |
Products it the default. | |
If task is summary, there are no additional options. | |
If task is best, specify the number of points to return (default 10). | |
""" | |
exit() | |
default_task = 'best' | |
infilename = argv[1] | |
xmlfile = argv[2] | |
try: | |
task = argv[2] or default_task | |
except IndexError: | |
task = default_task | |
records = TaskRecordParser(open(infilename)) | |
motifs = get_statistics(records, xmlfile) | |
if task == 'simplex': | |
default_output = "Products" | |
try: | |
output = argv[3] or default_output | |
except IndexError: | |
output = default_output | |
print str(make_simplex(motifs, output)) | |
elif task == 'summary': | |
print '\n'.join(get_summary_stats(motifs)) | |
elif task == 'best': | |
default_count = 10 | |
default_sort = 'All' | |
sort_options = ['Probs', 'Matches', 'Products'] | |
try: | |
count = int(argv[3]) or default_count | |
except (IndexError, TypeError): | |
count = default_count | |
try: | |
sort_by = argv[4] or default_sort | |
assert sort_by in sort_options + ['All'] | |
except: | |
sort_by = default_sort | |
if sort_by == 'All': | |
sort_by = sort_options | |
else: | |
sort_by = [sort_by] | |
for s in sort_by: | |
print s + ':' | |
best = find_n_best_probs(motifs, count, s) | |
for name, data in best.items(): | |
print name, ':' | |
print 'A\tC\tG\tU\tProduct\tMatch\tProb' | |
for d in data: | |
print "%d\t%d\t%d\t%d\t%0.3e\t%0.3e\t%0.3e" % \ | |
(100*d[1][0], 100*d[1][1], \ | |
100*d[1][2], 100 - round(100 * sum(d[1]), 0), \ | |
d[0], d[2], d[3]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment