Skip to content

Instantly share code, notes, and snippets.

@BenLangmead
Last active September 26, 2023 15:39
Show Gist options
  • Star 17 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save BenLangmead/5298132 to your computer and use it in GitHub Desktop.
Save BenLangmead/5298132 to your computer and use it in GitHub Desktop.
Demonstration of de Bruijn graph construction and Eulerian path/cycle finding.
class DeBruijnGraph:
""" A de Bruijn multigraph built from a collection of strings.
User supplies strings and k-mer length k. Nodes of the de
Bruijn graph are k-1-mers and edges correspond to the k-mer
that joins a left k-1-mer to a right k-1-mer. """
@staticmethod
def chop(st, k):
""" Chop a string up into k mers of given length """
for i in xrange(0, len(st)-(k-1)):
yield (st[i:i+k], st[i:i+k-1], st[i+1:i+k])
class Node:
""" Node in a de Bruijn graph, representing a k-1 mer. We keep
track of # of incoming/outgoing edges so it's easy to check
for balanced, semi-balanced. """
def __init__(self, km1mer):
self.km1mer = km1mer
self.nin = 0
self.nout = 0
def isSemiBalanced(self):
return abs(self.nin - self.nout) == 1
def isBalanced(self):
return self.nin == self.nout
def __hash__(self):
return hash(self.km1mer)
def __str__(self):
return self.km1mer
def __init__(self, strIter, k):
""" Build de Bruijn multigraph given string iterator and k-mer
length k """
self.G = {} # multimap from nodes to neighbors
self.nodes = {} # maps k-1-mers to Node objects
for st in strIter:
for kmer, km1L, km1R in self.chop(st, k):
nodeL, nodeR = None, None
if km1L in self.nodes:
nodeL = self.nodes[km1L]
else:
nodeL = self.nodes[km1L] = self.Node(km1L)
if km1R in self.nodes:
nodeR = self.nodes[km1R]
else:
nodeR = self.nodes[km1R] = self.Node(km1R)
nodeL.nout += 1
nodeR.nin += 1
self.G.setdefault(nodeL, []).append(nodeR)
# Iterate through nodes and tally how many are balanced,
# semi-balanced, or neither
self.nsemi, self.nbal, self.nneither = 0, 0, 0
# Keep track of head and tail nodes in the case of a graph with
# Eularian path (not cycle)
self.head, self.tail = None, None
for node in self.nodes.itervalues():
if node.isBalanced():
self.nbal += 1
elif node.isSemiBalanced():
if node.nin == node.nout + 1:
self.tail = node
if node.nin == node.nout - 1:
self.head = node
self.nsemi += 1
else:
self.nneither += 1
def nnodes(self):
""" Return # nodes """
return len(self.nodes)
def nedges(self):
""" Return # edges """
return len(self.G)
def hasEulerianPath(self):
""" Return true iff graph has Eulerian path. """
return self.nneither == 0 and self.nsemi == 2
def hasEulerianCycle(self):
""" Return true iff graph has Eulerian cycle. """
return self.nneither == 0 and self.nsemi == 0
def isEulerian(self):
""" Return true iff graph has Eulerian path or cycle """
return self.hasEulerianPath() or self.hasEulerianCycle()
def eulerianPath(self):
""" Find and return Eulerian path or cycle (as appropriate) """
assert self.isEulerian()
g = self.G
if self.hasEulerianPath():
g = g.copy()
assert self.head is not None
assert self.tail is not None
g.setdefault(self.tail, []).append(self.head)
# graph g has an Eulerian cycle
tour = []
src = g.iterkeys().next() # pick arbitrary starting node
def __visit(n):
while len(g[n]) > 0:
dst = g[n].pop()
__visit(dst)
tour.append(n)
__visit(src)
tour = tour[::-1][:-1]
if self.hasEulerianPath():
# Adjust node list so that it starts at head and ends at tail
sti = tour.index(self.head)
tour = tour[sti:] + tour[:sti]
# Return node list
return map(str, tour)
def toDot(self, dotFh, weights=False):
""" Write dot representation to given filehandle. If 'weights'
is true, label edges corresponding to distinct k-1-mers
with weights, instead of writing a separate edge for each
copy of a k-1-mer. """
dotFh.write("digraph \"Graph\" {\n")
dotFh.write(" bgcolor=\"transparent\";\n")
for node in self.G.iterkeys():
lab = node.km1mer
dotFh.write(" %s [label=\"%s\"] ;\n" % (lab, lab))
for src, dsts in self.G.iteritems():
srclab = src.km1mer
if weights:
weightmap = {}
if weights:
for dst in dsts:
weightmap[dst] = weightmap.get(dst, 0) + 1
for dst, v in weightmap.iteritems():
dstlab = dst.km1mer
dotFh.write(" %s -> %s [label=\"%d\"] ;\n" % (srclab, dstlab, v))
else:
for dst in dsts:
srclab = src.km1mer
dstlab = dst.km1mer
dotFh.write(" %s -> %s [label=\"\"] ;\n" % (srclab, dstlab))
dotFh.write("}\n")
@bmazoure
Copy link

Hey!
Great implementation, I'm trying to adapt / enhance a similar code to allow variants. The main issue with this would be the creation of new k-mers and the trouble to pair them back. From D. Zerbino's thesis, I got that they used coloring to distinguish between SV / base variants and different samples. Any ideas on what would be a memory-efficient way to implement it?
Thanks a lot!

@anish-stha
Copy link

If I feed a fastq file to this, the graph is never Eulerian. Am I doing something wrong?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment