Last active
January 20, 2021 23:24
-
-
Save lrvdijk/f78523dff4e9bbf85a2150df9d597c0d to your computer and use it in GitHub Desktop.
Visualize unitgs in a De Bruijn graph. Emulates Bifrost's construction algorithm, but with more explicit use of nodes and edges.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script creates a small-scale De Bruijn graph visualization using graphviz. | |
The construction of the De Bruijn graph is done in a similar way how Bifrost | |
constructs the graph, although with different data structures (more explicit | |
use of nodes and edges). | |
Usage: | |
dbg_dot.py k FASTA | dot -Tpng -o output.ong | |
""" | |
import sys | |
from typing import Iterable, List, Tuple, Dict, Optional | |
import seaborn | |
import networkx as nx | |
import skbio | |
from skbio import DNA | |
from matplotlib.colors import to_hex | |
def reverse_complement_str(sequence: str) -> str: | |
return str(DNA(sequence, validate=False).reverse_complement()) | |
def kmerize_seq(k, seq: str, canonical: bool=False) -> Iterable[str]: | |
for pos in range(len(seq) - k + 1): | |
kmer = seq[pos:pos+k] | |
if canonical: | |
yield canonical_seq(kmer) | |
else: | |
yield kmer | |
def canonical_seq(kmer: str) -> str: | |
rc = reverse_complement_str(kmer) | |
return rc if rc < kmer else kmer | |
node_tpl = """{forward} [label=< | |
<table border="0" cellborder="1" cellspacing="0"> | |
<tr> | |
<td bgcolor="{bgcolor_fw}">{forward}</td> | |
</tr> | |
<tr> | |
<td bgcolor="{bgcolor_bw}">{backward}</td> | |
</tr> | |
</table>>];""" | |
def nx_de_bruijn_to_graphviz(g: 'nx.DiGraph', | |
colors: Dict[int, str]) -> Iterable[str]: | |
yield 'digraph dbg {' | |
yield 'rankdir=LR;' | |
yield 'node [shape=plaintext];' | |
yield 'sep="+20";' | |
yield 'overlap="prism";' | |
# Draw each node with both its forward and reverse representation | |
for n in g.nodes(): | |
bw = reverse_complement_str(n) | |
unitig_fw = g.nodes[n].get('unitig+') | |
unitig_bw = g.nodes[n].get('unitig-') | |
if unitig_fw is not None: | |
color_attr = { | |
'bgcolor_fw': colors[unitig_fw], | |
'bgcolor_bw': 'white', | |
} | |
elif unitig_bw is not None: | |
color_attr = { | |
'bgcolor_fw': 'white', | |
'bgcolor_bw': colors[unitig_bw], | |
} | |
else: | |
color_attr = { | |
'bgcolor_fw': 'white', | |
'bgcolor_bw': 'white', | |
} | |
yield node_tpl.format(node=n, forward=n, backward=bw, **color_attr) | |
for n1, n2, data in g.edges(data=True): | |
unitig = data.get('unitig') | |
color = colors[unitig] if unitig is not None else 'black' | |
tail, head = data['orientation'] | |
yield (f'{n1} -> {n2} [taillabel="{tail}", headlabel="{head}", ' | |
f'color="{color}"]') | |
yield '}' | |
def oriented_in_edges(g: 'nx.DiGraph', n: str, orientation: str): | |
return ( | |
e for e in g.in_edges(n, data=True) | |
if e[2]['orientation'][1] == orientation | |
) | |
def oriented_out_edges(g: 'nx.DiGraph', n: str, orientation: str): | |
return ( | |
e for e in g.out_edges(n, data=True) | |
if e[2]['orientation'][0] == orientation | |
) | |
def check_next_kmer(g: 'nx.DiGraph', kmer: str, | |
orientation: str) -> Optional[Tuple[str, str]]: | |
out_edges = list(oriented_out_edges(g, kmer, orientation)) | |
if len(out_edges) == 1: | |
# Check that the next kmer doesn't have multiple incoming edges | |
u, v, d = out_edges[0] | |
in_edges = list(oriented_in_edges(g, v, d['orientation'][1])) | |
if len(in_edges) == 1: | |
return v, d['orientation'][1] | |
return None | |
def extend_forward(g: 'nx.DiGraph', start_kmer: str): | |
canonical_start = canonical_seq(start_kmer) | |
start_orientation = '+' if canonical_start == start_kmer else '-' | |
prev_kmer = canonical_start | |
current = check_next_kmer(g, canonical_start, start_orientation) | |
extensions = [] | |
while current: | |
kmer, orientation = current | |
if kmer == canonical_start: | |
# Cycle, quit extending unitig | |
current = None | |
elif kmer == prev_kmer: | |
# Self loop, quit extending | |
current = None | |
else: | |
extensions.append(current) | |
prev_kmer = kmer | |
current = check_next_kmer(g, *current) | |
return extensions | |
def rev_compl_unitig_path(path: List[Tuple[str, str]]) -> List[ | |
Tuple[str, str]]: | |
return [ | |
(n, '+' if o == '-' else '-') for n, o in path[::-1] | |
] | |
def extract_unitig(g: 'nx.DiGraph', kmer: str) -> List[Tuple[str, str]]: | |
# Given k-mer must be the canonical kmer | |
rev_compl = reverse_complement_str(kmer) | |
fw_ext = extend_forward(g, kmer) | |
bw_ext = extend_forward(g, rev_compl) | |
return rev_compl_unitig_path(bw_ext) + [(kmer, '+')] + fw_ext | |
def extract_unitigs(k: int, orig_seqs: Iterable[str], | |
g: 'nx.DiGraph') -> List[List[Tuple[str, str]]]: | |
unitigs = [] | |
nodes_visited = set() | |
for seq in orig_seqs: | |
for kmer in kmerize_seq(k, seq, True): | |
if kmer in nodes_visited: | |
continue | |
unitig = extract_unitig(g, kmer) | |
nodes_visited.update(n for n, o in unitig) | |
unitigs.append(unitig) | |
return unitigs | |
def unitig_sequence(unitig: List[Tuple[str, str]]) -> str: | |
if len(unitig) == 0: | |
return "" | |
else: | |
node, orientation = unitig[0] | |
if orientation == '+': | |
seq = [node] | |
else: | |
seq = [reverse_complement_str(node)] | |
for node, orientation in unitig[1:]: | |
if orientation == '+': | |
seq.append(node[-1]) | |
else: | |
seq.append(reverse_complement_str(node)[-1]) | |
return "".join(seq) | |
def main(): | |
k = int(sys.argv[1]) | |
fname = sys.argv[2] | |
g = nx.DiGraph() | |
# Kmerize the input, and add each canonical k-mer as node | |
with open(fname) as f: | |
for seq in skbio.io.read(f, "fasta"): | |
for kmer in kmerize_seq(k, str(seq), True): | |
g.add_node(kmer) | |
# Now infer the edges between nodes by checking all possible extensions, | |
# and keeping track of the right orientations. | |
for n in g.nodes(): | |
rev = reverse_complement_str(n) | |
for base in "ACTG": | |
successor = n[1:] + base | |
successor_cano = canonical_seq(successor) | |
if successor_cano in g: | |
target_strand = '+' if successor_cano == successor else '-' | |
g.add_edge(n, successor_cano, orientation=('+', target_strand)) | |
successor = rev[1:] + base | |
successor_cano = canonical_seq(successor) | |
if successor_cano in g: | |
target_strand = '+' if successor_cano == successor else '-' | |
g.add_edge(n, successor_cano, orientation=('-', target_strand)) | |
# Extract unitigs | |
# K-merize the input again. Check if we encounter a k-mer that we haven't | |
# seen before, if so, extract the unitig by extending the k-mer forward and | |
# backwards. | |
with open(fname) as f: | |
seqs = (str(seq) for seq in skbio.io.read(f, "fasta")) | |
unitigs = extract_unitigs(k, seqs, g) | |
# Annotate nodes and edges with unitig information | |
for i, unitig in enumerate(unitigs): | |
prev = None | |
for node, orientation in unitig: | |
if prev is not None: | |
g[prev][node]['unitig'] = i | |
key = f'unitig{orientation}' | |
g.nodes[node][key] = i | |
prev = node | |
palette = seaborn.color_palette(n_colors=len(unitigs)) | |
colors = {i: to_hex(palette[i]) for i, unitig in enumerate(unitigs)} | |
# Generate DOT description | |
for line in nx_de_bruijn_to_graphviz(g, colors): | |
print(line) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The following example is from the following paper:
Turner, Isaac, Kiran V. Garimella, Zamin Iqbal, and Gil McVean. 2018. “Integrating Long-Range Connectivity Information into de Bruijn Graphs.” Bioinformatics 34 (15): 2556–65.