Created
November 25, 2022 03:28
-
-
Save josephhughes/93919ddbb63b28493935b4b8583e3fe8 to your computer and use it in GitHub Desktop.
Converting the pangolin lineage information into json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import argparse | |
import csv | |
import sys | |
# provide as input | |
# 1) the curation notes (tsv) (more extensive thant lineage_notes.txt, which only has lineage and description) | |
# contains: Lineage Rough number of SNPs Example sequence Active/ Unobserved/ Inactive Designator Size (roughly) Description | |
# 2) full_alias_key.txt a file with the renames for the aliases (.txt): alias,lineage | |
# to do: | |
# get rid of the first tree node | |
# change nme to id | |
# change nodeName to name | |
# doesn't need direction | |
parser = argparse.ArgumentParser(description="Create a .json of the pango lineage designations", add_help=False) | |
parser.add_argument("--notes", help="The text-tab delimited file (.tsv) with the curation notes.\n") | |
parser.add_argument("--alias", help="The aliases in .csv\n") | |
parser.add_argument("--json", help="The name for the output file in json.\n") | |
parser.add_argument("-h","--help", action="store_true") | |
args = parser.parse_args() | |
def custom_help(): | |
print("Required Arguments!!") | |
print("\t--notes [curation_notes.tsv]") | |
print("\t--alias [full_alias_key.txt]") | |
print("\t--json [report]") | |
if len(sys.argv) < 3 or args.help: | |
print(custom_help()) | |
exit(1) | |
aliasLookup = {} | |
with open(args.alias, newline = '') as alias: | |
alias_reader = csv.DictReader(alias, delimiter=',') | |
headers = alias_reader.fieldnames | |
for row in alias_reader: | |
aliasLookup[row['alias']]=row['lineage'] | |
edgelist=[] | |
info={} | |
with open(args.notes, newline = '') as notes: | |
notes_reader = csv.DictReader(notes, delimiter='\t') | |
headers = notes_reader.fieldnames | |
for row in notes_reader: | |
print(row['Lineage']) | |
lineage=row['Lineage'].replace("*", "") | |
print("Lineage",lineage) | |
if lineage in aliasLookup: | |
info[aliasLookup[lineage]]={"alias":lineage,"approxSNP":row['Rough number of SNPs'],"exampleSeq":row['Example sequence'],"status":row['Active/ Unobserved/ Inactive'],"designator":row['Designator'],"approxSeqNb":row['Size (roughly)'],"desc":row['Description']} | |
print("Alias",aliasLookup[lineage]) | |
if "." in aliasLookup[lineage]: | |
full_lineage = aliasLookup[lineage].split(".") | |
parent=full_lineage[0] | |
for i in range(len(full_lineage)): | |
if i+1 < len(full_lineage): | |
child=parent+"."+str(full_lineage[i+1]) | |
pair=(parent,child) | |
edgelist.append(pair) | |
parent=child | |
# else: | |
# pair=("tree",aliasLookup[lineage]) | |
# edgelist.append(pair) | |
else: | |
info[lineage]={"approxSNP":row['Rough number of SNPs'],"exampleSeq":row['Example sequence'],"status":row['Active/ Unobserved/ Inactive'],"designator":row['Designator'],"approxSeqNb":row['Size (roughly)'],"desc":row['Description']} | |
if "." in lineage: | |
full_lineage = lineage.split(".") | |
parent=full_lineage[0] | |
for i in range(len(full_lineage)): | |
if i+1 < len(full_lineage): | |
child=parent+"."+str(full_lineage[i+1]) | |
pair=(parent,child) | |
edgelist.append(pair) | |
parent=child | |
# else: | |
# pair=("tree",lineage) | |
# edgelist.append(pair) | |
print(edgelist) | |
links = list(set(edgelist)) | |
print(links) | |
#links = [("Dick","Harry"),("Tom","Dick"),("Tom","Larry"),("Bob","Leroy"),("Bob","Earl")] | |
parents, children = zip(*links) | |
root_nodes = {x for x in parents if x not in children} | |
for node in root_nodes: | |
links.append(('Root', node)) | |
def get_nodes(node): | |
d = {} | |
d['nodeName'] = node | |
if node in info: | |
if "alias" in info[node]: d['alias'] = info[node]['alias'] | |
#d['alias'] = info[node]['alias'] | |
d['approxSNPs'] = info[node]['approxSNP'] | |
d['exampleSeq'] = info[node]['exampleSeq'] | |
d['status'] = info[node]['status'] | |
d['designator'] = info[node]['designator'] | |
d['approxSeqNb'] = info[node]['approxSeqNb'] | |
d['desc'] = info[node]['desc'] | |
d['link'] ={} | |
d['link'] = {"name" : "Link "+str(node), "nodeName" : node, "direction": "ASYN"} | |
# "link" : { | |
# "name" : "Link NODE NAME 1", | |
# "nodeName" : "NODE NAME 1", | |
# "direction" : "ASYN" | |
# }, | |
children = get_children(node) | |
if children: | |
d['children'] = [get_nodes(child) for child in children] | |
return d | |
def get_children(node): | |
return [x[1] for x in links if x[0] == node] | |
tree = get_nodes('Root') | |
tree_dict={} | |
tree_dict["tree"]=tree | |
print(json.dumps(tree_dict, indent=1)) | |
# Save the updated JSON | |
with open(args.json, 'w') as outfile: | |
json.dump(tree_dict, outfile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment