Skip to content

Instantly share code, notes, and snippets.

@lrvdijk
Last active January 20, 2021 23:24
Show Gist options
  • Save lrvdijk/f78523dff4e9bbf85a2150df9d597c0d to your computer and use it in GitHub Desktop.
Save lrvdijk/f78523dff4e9bbf85a2150df9d597c0d to your computer and use it in GitHub Desktop.
Visualize unitgs in a De Bruijn graph. Emulates Bifrost's construction algorithm, but with more explicit use of nodes and edges.
"""
This script creates a small-scale De Bruijn graph visualization using graphviz.
The construction of the De Bruijn graph is done in a similar way how Bifrost
constructs the graph, although with different data structures (more explicit
use of nodes and edges).
Usage:
dbg_dot.py k FASTA | dot -Tpng -o output.ong
"""
import sys
from typing import Iterable, List, Tuple, Dict, Optional
import seaborn
import networkx as nx
import skbio
from skbio import DNA
from matplotlib.colors import to_hex
def reverse_complement_str(sequence: str) -> str:
return str(DNA(sequence, validate=False).reverse_complement())
def kmerize_seq(k, seq: str, canonical: bool=False) -> Iterable[str]:
for pos in range(len(seq) - k + 1):
kmer = seq[pos:pos+k]
if canonical:
yield canonical_seq(kmer)
else:
yield kmer
def canonical_seq(kmer: str) -> str:
rc = reverse_complement_str(kmer)
return rc if rc < kmer else kmer
node_tpl = """{forward} [label=<
<table border="0" cellborder="1" cellspacing="0">
<tr>
<td bgcolor="{bgcolor_fw}">{forward}</td>
</tr>
<tr>
<td bgcolor="{bgcolor_bw}">{backward}</td>
</tr>
</table>>];"""
def nx_de_bruijn_to_graphviz(g: 'nx.DiGraph',
colors: Dict[int, str]) -> Iterable[str]:
yield 'digraph dbg {'
yield 'rankdir=LR;'
yield 'node [shape=plaintext];'
yield 'sep="+20";'
yield 'overlap="prism";'
# Draw each node with both its forward and reverse representation
for n in g.nodes():
bw = reverse_complement_str(n)
unitig_fw = g.nodes[n].get('unitig+')
unitig_bw = g.nodes[n].get('unitig-')
if unitig_fw is not None:
color_attr = {
'bgcolor_fw': colors[unitig_fw],
'bgcolor_bw': 'white',
}
elif unitig_bw is not None:
color_attr = {
'bgcolor_fw': 'white',
'bgcolor_bw': colors[unitig_bw],
}
else:
color_attr = {
'bgcolor_fw': 'white',
'bgcolor_bw': 'white',
}
yield node_tpl.format(node=n, forward=n, backward=bw, **color_attr)
for n1, n2, data in g.edges(data=True):
unitig = data.get('unitig')
color = colors[unitig] if unitig is not None else 'black'
tail, head = data['orientation']
yield (f'{n1} -> {n2} [taillabel="{tail}", headlabel="{head}", '
f'color="{color}"]')
yield '}'
def oriented_in_edges(g: 'nx.DiGraph', n: str, orientation: str):
return (
e for e in g.in_edges(n, data=True)
if e[2]['orientation'][1] == orientation
)
def oriented_out_edges(g: 'nx.DiGraph', n: str, orientation: str):
return (
e for e in g.out_edges(n, data=True)
if e[2]['orientation'][0] == orientation
)
def check_next_kmer(g: 'nx.DiGraph', kmer: str,
orientation: str) -> Optional[Tuple[str, str]]:
out_edges = list(oriented_out_edges(g, kmer, orientation))
if len(out_edges) == 1:
# Check that the next kmer doesn't have multiple incoming edges
u, v, d = out_edges[0]
in_edges = list(oriented_in_edges(g, v, d['orientation'][1]))
if len(in_edges) == 1:
return v, d['orientation'][1]
return None
def extend_forward(g: 'nx.DiGraph', start_kmer: str):
canonical_start = canonical_seq(start_kmer)
start_orientation = '+' if canonical_start == start_kmer else '-'
prev_kmer = canonical_start
current = check_next_kmer(g, canonical_start, start_orientation)
extensions = []
while current:
kmer, orientation = current
if kmer == canonical_start:
# Cycle, quit extending unitig
current = None
elif kmer == prev_kmer:
# Self loop, quit extending
current = None
else:
extensions.append(current)
prev_kmer = kmer
current = check_next_kmer(g, *current)
return extensions
def rev_compl_unitig_path(path: List[Tuple[str, str]]) -> List[
Tuple[str, str]]:
return [
(n, '+' if o == '-' else '-') for n, o in path[::-1]
]
def extract_unitig(g: 'nx.DiGraph', kmer: str) -> List[Tuple[str, str]]:
# Given k-mer must be the canonical kmer
rev_compl = reverse_complement_str(kmer)
fw_ext = extend_forward(g, kmer)
bw_ext = extend_forward(g, rev_compl)
return rev_compl_unitig_path(bw_ext) + [(kmer, '+')] + fw_ext
def extract_unitigs(k: int, orig_seqs: Iterable[str],
g: 'nx.DiGraph') -> List[List[Tuple[str, str]]]:
unitigs = []
nodes_visited = set()
for seq in orig_seqs:
for kmer in kmerize_seq(k, seq, True):
if kmer in nodes_visited:
continue
unitig = extract_unitig(g, kmer)
nodes_visited.update(n for n, o in unitig)
unitigs.append(unitig)
return unitigs
def unitig_sequence(unitig: List[Tuple[str, str]]) -> str:
if len(unitig) == 0:
return ""
else:
node, orientation = unitig[0]
if orientation == '+':
seq = [node]
else:
seq = [reverse_complement_str(node)]
for node, orientation in unitig[1:]:
if orientation == '+':
seq.append(node[-1])
else:
seq.append(reverse_complement_str(node)[-1])
return "".join(seq)
def main():
k = int(sys.argv[1])
fname = sys.argv[2]
g = nx.DiGraph()
# Kmerize the input, and add each canonical k-mer as node
with open(fname) as f:
for seq in skbio.io.read(f, "fasta"):
for kmer in kmerize_seq(k, str(seq), True):
g.add_node(kmer)
# Now infer the edges between nodes by checking all possible extensions,
# and keeping track of the right orientations.
for n in g.nodes():
rev = reverse_complement_str(n)
for base in "ACTG":
successor = n[1:] + base
successor_cano = canonical_seq(successor)
if successor_cano in g:
target_strand = '+' if successor_cano == successor else '-'
g.add_edge(n, successor_cano, orientation=('+', target_strand))
successor = rev[1:] + base
successor_cano = canonical_seq(successor)
if successor_cano in g:
target_strand = '+' if successor_cano == successor else '-'
g.add_edge(n, successor_cano, orientation=('-', target_strand))
# Extract unitigs
# K-merize the input again. Check if we encounter a k-mer that we haven't
# seen before, if so, extract the unitig by extending the k-mer forward and
# backwards.
with open(fname) as f:
seqs = (str(seq) for seq in skbio.io.read(f, "fasta"))
unitigs = extract_unitigs(k, seqs, g)
# Annotate nodes and edges with unitig information
for i, unitig in enumerate(unitigs):
prev = None
for node, orientation in unitig:
if prev is not None:
g[prev][node]['unitig'] = i
key = f'unitig{orientation}'
g.nodes[node][key] = i
prev = node
palette = seaborn.color_palette(n_colors=len(unitigs))
colors = {i: to_hex(palette[i]) for i, unitig in enumerate(unitigs)}
# Generate DOT description
for line in nx_de_bruijn_to_graphviz(g, colors):
print(line)
if __name__ == '__main__':
main()
@lrvdijk
Copy link
Author

lrvdijk commented Mar 5, 2020

>seq1
TAATGTTGCG
>seq2
TAATGTTACG

test2

The following example is from the following paper:
Turner, Isaac, Kiran V. Garimella, Zamin Iqbal, and Gil McVean. 2018. “Integrating Long-Range Connectivity Information into de Bruijn Graphs.” Bioinformatics 34 (15): 2556–65.

>genome1
ACTGATTTCGATGCGATGCGATGCCACGGTGG

test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment