Skip to content

Instantly share code, notes, and snippets.

@BenLangmead
Last active December 27, 2015 00:19
Show Gist options
  • Save BenLangmead/7237207 to your computer and use it in GitHub Desktop.
Save BenLangmead/7237207 to your computer and use it in GitHub Desktop.
{
"metadata": {
"name": ""
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": [
"class DeBruijnGraph:\n",
" ''' A de Bruijn directed multigraph built from a collection of\n",
" strings. User supplies strings and k-mer length k. Nodes of\n",
" the de Bruijn graph are k-1-mers and edges correspond to the\n",
" k-mer that joins a left k-1-mer to a right k-1-mer. '''\n",
" \n",
" @staticmethod\n",
" def chop(st, k):\n",
" ''' Chop a string up into k mers of given length '''\n",
" for i in xrange(0, len(st)-(k-1)):\n",
" yield (st[i:i+k], st[i:i+k-1], st[i+1:i+k])\n",
" \n",
" class Node:\n",
" ''' Node in a de Bruijn graph, representing a k-1 mer. We keep\n",
" track of # of incoming/outgoing edges so it's easy to check\n",
" for balanced, semi-balanced. '''\n",
" \n",
" def __init__(self, km1mer):\n",
" self.km1mer = km1mer\n",
" self.nin = 0\n",
" self.nout = 0\n",
" \n",
" def isSemiBalanced(self):\n",
" return abs(self.nin - self.nout) == 1\n",
" \n",
" def isBalanced(self):\n",
" return self.nin == self.nout\n",
" \n",
" def __hash__(self):\n",
" return hash(self.km1mer)\n",
" \n",
" def __str__(self):\n",
" return self.km1mer\n",
" \n",
" def __init__(self, strIter, k, circularize=False):\n",
" ''' Build de Bruijn multigraph given string iterator and k-mer\n",
" length k '''\n",
" self.G = {} # multimap from nodes to neighbors\n",
" self.nodes = {} # maps k-1-mers to Node objects\n",
" for st in strIter:\n",
" if circularize:\n",
" st += st[:k-1]\n",
" for kmer, km1L, km1R in self.chop(st, k):\n",
" nodeL, nodeR = None, None\n",
" if km1L in self.nodes:\n",
" nodeL = self.nodes[km1L]\n",
" else:\n",
" nodeL = self.nodes[km1L] = self.Node(km1L)\n",
" if km1R in self.nodes:\n",
" nodeR = self.nodes[km1R]\n",
" else:\n",
" nodeR = self.nodes[km1R] = self.Node(km1R)\n",
" nodeL.nout += 1\n",
" nodeR.nin += 1\n",
" self.G.setdefault(nodeL, []).append(nodeR)\n",
" # Iterate through nodes and tally how many are balanced,\n",
" # semi-balanced, or neither\n",
" self.nsemi, self.nbal, self.nneither = 0, 0, 0\n",
" # Keep track of head and tail nodes in the case of a graph with\n",
" # Eularian walk (not cycle)\n",
" self.head, self.tail = None, None\n",
" for node in self.nodes.itervalues():\n",
" if node.isBalanced():\n",
" self.nbal += 1\n",
" elif node.isSemiBalanced():\n",
" if node.nin == node.nout + 1:\n",
" self.tail = node\n",
" if node.nin == node.nout - 1:\n",
" self.head = node\n",
" self.nsemi += 1\n",
" else:\n",
" self.nneither += 1\n",
" \n",
" def nnodes(self):\n",
" ''' Return # nodes '''\n",
" return len(self.nodes)\n",
" \n",
" def nedges(self):\n",
" ''' Return # edges '''\n",
" return len(self.G)\n",
" \n",
" def hasEulerianWalk(self):\n",
" ''' Return true iff graph has Eulerian walk. '''\n",
" return self.nneither == 0 and self.nsemi == 2\n",
" \n",
" def hasEulerianCycle(self):\n",
" ''' Return true iff graph has Eulerian cycle. '''\n",
" return self.nneither == 0 and self.nsemi == 0\n",
" \n",
" def isEulerian(self):\n",
" ''' Return true iff graph has Eulerian walk or cycle '''\n",
" # technically, if it has an Eulerian walk\n",
" return self.hasEulerianWalk() or self.hasEulerianCycle()\n",
" \n",
" def eulerianWalkOrCycle(self):\n",
" ''' Find and return Eulerian walk or cycle (as appropriate) '''\n",
" assert self.isEulerian()\n",
" g = self.G\n",
" if self.hasEulerianWalk():\n",
" g = g.copy()\n",
" assert self.head is not None\n",
" assert self.tail is not None\n",
" g.setdefault(self.tail, []).append(self.head)\n",
" # graph g has an Eulerian cycle\n",
" tour = []\n",
" src = g.iterkeys().next() # pick arbitrary starting node\n",
" \n",
" def __visit(n):\n",
" while len(g[n]) > 0:\n",
" dst = g[n].pop()\n",
" __visit(dst)\n",
" tour.append(n)\n",
" \n",
" __visit(src)\n",
" tour = tour[::-1][:-1] # reverse and then take all but last node\n",
" \n",
" if self.hasEulerianWalk():\n",
" # Adjust node list so that it starts at head and ends at tail\n",
" sti = tour.index(self.head)\n",
" tour = tour[sti:] + tour[:sti]\n",
" \n",
" # Return node list\n",
" return map(str, tour)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g = DeBruijnGraph(['AAABBBA'], k=3)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 3,
"text": [
"(True, True, False)"
]
}
],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# the figure we drew in class had 4 nodes\n",
"g.nnodes()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 4,
"text": [
"4"
]
}
],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g.eulerianWalkOrCycle()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 5,
"text": [
"['AA', 'AB', 'BB', 'BB', 'BA', 'AA']"
]
}
],
"prompt_number": 5
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g = DeBruijnGraph(['AAABBBBA'], k=3) # Add 1 more B to the run of Bs"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 6
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 7,
"text": [
"(True, True, False)"
]
}
],
"prompt_number": 7
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# the figure we drew in class again had 4 nodes\n",
"g.nnodes()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 8,
"text": [
"4"
]
}
],
"prompt_number": 8
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g.eulerianWalkOrCycle()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 9,
"text": [
"['AA', 'AB', 'BB', 'BB', 'BB', 'BA', 'AA']"
]
}
],
"prompt_number": 9
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# circularize makes DeBruijnGraph treat string as circular,\n",
"# returning to left-hand side at extreme right end\n",
"g = DeBruijnGraph(['AAABBBBA'], k=3, circularize=True)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 10
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g.isEulerian(), g.hasEulerianWalk(), g.hasEulerianCycle()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 11,
"text": [
"(True, False, True)"
]
}
],
"prompt_number": 11
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"g.eulerianWalkOrCycle()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 12,
"text": [
"['AA', 'AA', 'AB', 'BB', 'BB', 'BB', 'BA', 'AA']"
]
}
],
"prompt_number": 12
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"DeBruijnGraph([\"a_long_long_long_time\"], 5).eulerianWalkOrCycle()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 13,
"text": [
"['a_lo',\n",
" '_lon',\n",
" 'long',\n",
" 'ong_',\n",
" 'ng_l',\n",
" 'g_lo',\n",
" '_lon',\n",
" 'long',\n",
" 'ong_',\n",
" 'ng_l',\n",
" 'g_lo',\n",
" '_lon',\n",
" 'long',\n",
" 'ong_',\n",
" 'ng_t',\n",
" 'g_ti',\n",
" '_tim',\n",
" 'time']"
]
}
],
"prompt_number": 13
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"DeBruijnGraph(['a_long_long_long_time'], 5).eulerianWalkOrCycle().count('long')"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 14,
"text": [
"3"
]
}
],
"prompt_number": 14
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# see if we can get correct reconstruction at k=4\n",
"walk = DeBruijnGraph(['to_every_thing_turn_turn_turn_there_is_a_season'], 4).eulerianWalkOrCycle()\n",
"walk[0] + ''.join(map(lambda x: x[-1], walk[1:]))"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 15,
"text": [
"'to_every_thing_turn_turn_turn_there_is_a_season'"
]
}
],
"prompt_number": 15
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# that worked, but k=3 doesn't! unresolvable repeat at k=3\n",
"walk = DeBruijnGraph(['to_every_thing_turn_turn_turn_there_is_a_season'], 3).eulerianWalkOrCycle()\n",
"walk[0] + ''.join(map(lambda x: x[-1], walk[1:]))"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 16,
"text": [
"'to_every_turn_turn_thing_turn_there_is_a_season'"
]
}
],
"prompt_number": 16
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 16
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment