Skip to content

Instantly share code, notes, and snippets.

@runiq
Created September 21, 2012 13:56
Show Gist options
  • Save runiq/3761606 to your computer and use it in GitHub Desktop.
Save runiq/3761606 to your computer and use it in GitHub Desktop.
Where my gerunds at
import sys
import Queue
class Node(object):
def __init__(self, id, parent=None, dist=0, maxchildren=2):
self.id = id
self.num = 0
self.dist = dist
self.parent = parent
self.compute_dist_to_parent()
self.maxchildren = maxchildren
self.children = []
self.compute_num()
def __repr__(self):
return "Node id {0}, dist {1}, decoys {2}".format(self.id, self.dist, self.num)
def __str__(self):
return "Node id {0}, dist {1}, decoys {2}".format(self.id, self.dist, self.num)
def __lt__(self, other):
return self.dist > other
def __gt__(self, other):
return self.dist < other
def __le__(self, other):
return self.dist >= other
def __ge__(self, other):
return self.dist <= other
def __eq__(self, other):
return self.dist == other
def __ne__(self, other):
return self.dist != other
def __iter__(self):
yield self
for child in self.children:
for yielded in child:
yield yielded
def add_child(self, childnode):
"""
Adds a reference to a child node.
Throws an AttributeError if there are already *MAXLEN* child
nodes assigned.
"""
if len(self.children) > self.maxchildren:
raise AttributeError("No more than two children per node")
self.children.append(childnode)
childnode.set_parent(self)
self.compute_num()
def set_parent(self, parentnode):
"""
Adds a reference to the parent node.
This is invoked automatically by add_child().
"""
if self.parent is not None:
raise AttributeError("Node already has a parent")
if self not in parentnode.children and len(parentnode.children) >= self.maxchildren:
raise AttributeError("Cannot assign parent: Parent node already has {0} other children".format(self.maxchildren))
else:
self.parent = parentnode
self.compute_dist_to_parent()
def compute_num(self):
"""
Computes the number of decoys in this cluster.
"""
self.num = sum(child.num for child in self.children)
# Decoys are not represented as nodes, so we have to add them
# indirectly
# Since *every* node has two children (either explicit clusters
# or implicit decoys), we can add 1 to self.num for every
# implicit member of self.children (i.e. decoy)
self.num += self.maxchildren - len(self.children)
def compute_dist_to_parent(self):
"""
Returns the relative distance from this node to the parent node.
"""
if self.parent is None:
self.dist_to_parent = 0
else:
self.dist_to_parent = self.parent.dist - self.dist
def find_root(self):
"""
Returns the first node in the tree that has no parent.
"""
if self.parent is None:
return self
else:
self.parent.find_root()
def find_node(self, id):
"""
Returns a node by id.
"""
if self.id == id:
return self
else:
if self.children:
for child in self.children:
child.find_node(id)
raise KeyError("{0} not found in current subtree".format(id))
def create_newick(self, node_info='num', use_length=True, steps=None, dist=None):
"""
Creates a Newick representation from the current node downwards,
either *down* to a distance threshold or *up* to a certain
number of steps.
"""
def create_nodeset():
nq = Queue.PriorityQueue()
for node in self:
nq.put(node)
nodeset = set()
if dist is None and steps is None:
while not nq.empty():
nodeset.add(nq.get())
elif dist is None and steps is not None:
for _ in xrange(steps):
nodeset.add(nq.get())
elif dist is not None and steps is None:
while not nq.empty():
node = nq.get()
if node.dist >= dist:
nodeset.add(node)
elif dist is not None and steps is not None:
for _ in xrange(steps):
node = nq.get()
if node.dist >= dist:
nodeset.add(node)
return nodeset
def create_newick_helper(node, nodeset):
x = dict(num=node.num, dist=node.dist, id=node.id)
namestr = str(x[node_info])
if use_length:
namestr = ''.join([namestr, ':', str(node.dist_to_parent)])
if node.children:
childstr = ','.join(create_newick_helper(child, nodeset)
for child in node.children if child in nodeset)
if childstr:
childstr = ''.join(['(', childstr, ')'])
else:
childstr = ''
return ''.join([childstr, namestr])
nodeset = create_nodeset()
return ''.join([create_newick_helper(self, nodeset), ';'])
def parse_file(cmfile):
nodes = dict()
fh = open(cmfile)
for line in open(cmfile):
idx, childid1, childid2, dist = line.split()[:4]
idx = int(idx.rstrip(':'))
children = [int(childid1), int(childid2)]
node = Node(idx, dist=float(dist))
childnodes = []
for childid in children:
if childid < 0:
# Use existing node if it's a cluster
childnode = nodes[childid]
node.add_child(childnode)
nodes[idx] = node
fh.close()
return node
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment