Skip to content

Instantly share code, notes, and snippets.

@mtholder
Created March 16, 2021 19:34
Show Gist options
  • Save mtholder/bb4dc9c2ed82c570993e26e4e0e40392 to your computer and use it in GitHub Desktop.
Save mtholder/bb4dc9c2ed82c570993e26e4e0e40392 to your computer and use it in GitHub Desktop.
Compresses the output of otc-explain-phylo-diffs to a terser form
#!/usr/bin/env python3
"""Tool to compress tree_slice_comparison JSON
otc-explain-phylo-diffs writes a form of this JSON in which multiple
adjacent slices of the tree can have the same topology for each
of the two trees that were compared.
This tool merges adjacent slices that are not differing in topology
to produces a terser representation that is easier to display
Roughly speaking, the schema of the JSON is:
"root_id" -> string key in tree_comp_slices_by_root valid label is a SimpleNewick tree.
"tot_num_prunings_all_slices" -> integer. The number of node prunings to needed to make each retained backbone in a slice identical.
"tree_comp_slices_by_root" -> objeck
keys are valid label is a SimpleNewick tree. The label indicated by root_id must be present:
object one of 2 types: same tree or differing:
Same tree has:
"both_trees" subtree slice object
Differing trees have:
"tree_1" subtree slice object
"tree_2" subtree slice object
"comparison" object with:
"num_prunings": integer # of prunings needed to make this slice topology identical
"comp_pruning_rounds": array of triplet comparison rounds object. Each object has:
"num_triplets_comp": # of triplets compared in this round
"num_triplets_diff": # of triplets differing in this round
"prop_triplets_differing": float num_triplets_diff/num_triplets_comp if num_triplets_comp > 0
"pruned": (if not first round) the label pruned to start this round
subtree slice objects have the keys:
"newick" -> string SimpleNewick form of the tree for this slice
"node_data" -> object with keys for nodel labels in newick. each key maps to an object with metadata for the node. For example, the user-displayable label for the node will be in "label"
"""
import json
import sys
import re
_both_trees_key = 'both_trees'
_tree_1_key = "tree_1"
_leaf_label_pat = re.compile(r"(?<=[(,])(\d+)(?=[,)])")
def _leaves_and_punc_label_alt_list_from_newick(newick):
# sys.stderr.write(f'newick = {newick}\n')
leaves = _leaf_label_pat.findall(newick)
# sys.stderr.write(f'leaves = {leaves}\n')
split = _leaf_label_pat.split(newick)
# sys.stderr.write(f'split = {split}\n')
lfs = split[1:-1:2]
if lfs != leaves:
sys.exit(f'SimpleNewickError: splitting labels failed: {lfs} != {leaves}\n')
return leaves, split
def punc_label_alt_list_from(newick):
return _leaves_and_punc_label_alt_list_from_newick(newick)[1]
def leaves_from_newick(newick):
return _leaves_and_punc_label_alt_list_from_newick(newick)[0]
def newick_if_shared(label, slice_dict, node_data, to_do_stack):
curr_slice = slice_dict.get(label)
if curr_slice is None:
return label
bt_content = curr_slice.get(_both_trees_key)
if bt_content is None:
to_do_stack.append(label)
return label
curr_node_data = bt_content.get('node_data', {})
node_data.update(curr_node_data)
n = _newick_expand_shared_slices(bt_content['newick'], slice_dict, node_data, to_do_stack)
assert(n[-1] == ';')
return n[:-1]
def _non_shared_slice(curr_slice, to_do_stack):
t1_content = curr_slice[_tree_1_key]
to_do_stack.extend(leaves_from_newick(t1_content['newick']))
return curr_slice
def _newick_expand_shared_slices(newick, slice_dict, node_data, to_do_stack):
punc_label_alt_list = punc_label_alt_list_from(newick)
word_list = []
for ind, word in enumerate(punc_label_alt_list):
if ind % 2 == 0:
word_list.append(word)
else:
word_list.append(newick_if_shared(word, slice_dict, node_data, to_do_stack))
return ''.join(word_list)
def compress_slice(curr_slice, slice_dict, to_do_stack):
bt_content = curr_slice.get(_both_trees_key)
if bt_content is None:
return _non_shared_slice(curr_slice, to_do_stack)
node_data = bt_content.get('node_data', {})
bt_content['newick'] = _newick_expand_shared_slices(bt_content['newick'], slice_dict, node_data, to_do_stack)
if node_data:
bt_content['node_data'] = node_data
return curr_slice
def main(in_fp=None):
if in_fp is None:
instream = sys.stdin
full_blob = json.load(sys.stdin)
else:
full_blob = json.load(open(in_fp, 'r', encoding='utf-8'))
to_do_stack = [full_blob["root_id"]]
slice_container_key = "tree_comp_slices_by_root"
ts = full_blob[slice_container_key]
next_slices = {}
while to_do_stack:
nl = to_do_stack.pop(0)
ns = ts.get(nl)
if ns is None:
continue
comp_slice = compress_slice(ns, ts, to_do_stack)
next_slices[nl] = comp_slice
full_blob[slice_container_key] = next_slices
print(json.dumps(full_blob, sort_keys=True, indent=2))
if __name__ == '__main__':
main(sys.argv[1] if len(sys.argv) > 1 else None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment