Last active
August 11, 2016 13:22
-
-
Save benozol/7560c5135ec1309a3ae081e93f6e99f8 to your computer and use it in GitHub Desktop.
Convert UMLS Metathesaurus files to a Peregrine ontology
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# (C) 2016 Benedikt Becker, Erasmus MC, Rotterdam | |
from collections import namedtuple, defaultdict | |
import argparse | |
import pickle | |
import gzip | |
import pandas as pd | |
import sys | |
import shlex | |
# A MRCONSO row | |
mrconso_row = namedtuple('mrconso_row', 'cui lat ts lui stt sui ispref aui saui scui sdui sab tty code str srl suppress cvf') | |
# A MRSTY row | |
mrsty_row = namedtuple('mrsty_row', 'cui tui stn sty atui cvf') | |
# A Peregrine concept | |
concept = namedtuple('concept', 'id terms db_ids name definition semantic_types vocabularies disambiguation parent_concepts') | |
def term_predicate(row): | |
# Include only terms from the Metathesaurus that correspond to this predicate | |
return (row.lat == 'ENG' | |
and row.ts in 'PS' # Not suppressible | |
and len(row.str) > 2) | |
def term_information(row): | |
# Generate term information for the term in row (a mrconso_row) | |
# https://trac.nbic.nl/data-mining/wiki/ErasmusMC%20ontology%20file%20format | |
# [lang=XX];[match=[ci],[no]] | |
return 'match=no' | |
def cui_to_id(cui): | |
return int(cui[1:]) | |
def open_maybe_gz(name, *args, **kwargs): | |
if name.endswith('.gz'): | |
return gzip.open(name, *args, **kwargs) | |
else: | |
return open(name, *args, **kwargs) | |
def main(): | |
description="Convert the UMLS Metathesaurus to a Peregrine ontology." | |
epilog = """For example: | |
$ {program} \\ | |
--umls-version 2015AB \\ | |
--mrconso 2015AB/2015AB/META/MRCONSO.RRF.gz \\ | |
--mrsty 2015AB/2015AB/META/MRSTY.RRF.gz \\ | |
--sty-groups SemGroups.txt \\ | |
--vocabularies MSH MDR SNM LNC ICD9CM ICD10 ICD10CM ICPC2EENG ICPC2P MDR MSH RCD RCD2 \\ | |
--semantic-groups ANAT CHEM DISO GENE LIVB PHEN PHYS PROC \\ | |
--output UMLS2015AB.ontology | |
The semantic groups can be found at the following URL: | |
https://semanticnetwork.nlm.nih.gov/download/SemGroups.txt | |
The file MRCONSO.RRF.gz can be created by | |
$ zcat MRCONSO.RRF.aa.gz MRCONSO.RRF.ab.gz | gzip - > MRCONSO.RRF.gz | |
""".format(program=sys.argv[0]) | |
parser = argparse.ArgumentParser(description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter) | |
parser.add_argument('--umls-version', metavar='VERSION', required=True, | |
help='UMLS version, used to generate the ontology name') | |
parser.add_argument('--mrconso', metavar='FILE', required=True, | |
help='Zipped MRCONSO file') | |
parser.add_argument('--mrsty', metavar='FILE', required=True, | |
help='Zipped MRSTY file') | |
parser.add_argument('--sty-groups', metavar='FILE', required=True, | |
help='Semantic groups file') | |
parser.add_argument('--output', metavar='FILE', required=True, | |
help='Peregrine output file') | |
parser.add_argument('--semantic-types', metavar='SEMANTIC-TYPE', nargs='*', | |
help='If given, include only UMLS concepts of these semantic types') | |
parser.add_argument('--semantic-groups', metavar='SEMANTIC-GROUP', nargs='*', | |
help='If given, include only UMLS concepts of these semantic groups') | |
parser.add_argument('--vocabularies', metavar='VOCABULARY', nargs='*', | |
help='If given, include only terms of these vocabularies') | |
parser.add_argument('--code-terms', choices=['code', 'voc-code', 'code-voc-code'], | |
help='Use codes as terms (for example: TM D015074 if code or code-voc-code, TM MSH:D015074 if voc-code or code-voc-code)') | |
parser.add_argument('--fields-db-identifier', action='store_true', | |
help='Write fields for database identifier') | |
parser.add_argument('--fields-semantic-type', action='store_true', | |
help='Write fields for database identifier') | |
parser.add_argument('--fields-vocabulary', action='store_true', | |
help='Write fields for vocabulary') | |
args = parser.parse_args() | |
ontology_name = ' '.join([shlex.quote(arg) for arg in sys.argv]) | |
if args.semantic_types: | |
args.semantic_types = set(args.semantic_types) | |
if args.semantic_groups: | |
args.semantic_groups = set(args.semantic_groups) | |
if args.vocabularies: | |
args.vocabularies = set(args.vocabularies) | |
semantic_groups = read_sty_groups(args.sty_groups) | |
semantic_types = read_semantic_types(args.mrsty) | |
with open_maybe_gz(args.mrconso, 'rt', encoding='utf-8') as mrconso_handle: | |
concepts = read_concepts(mrconso_handle, semantic_types, | |
semantic_groups, args.semantic_types, | |
args.semantic_groups, args.vocabularies, | |
args.code_terms, args.fields_db_identifier, | |
args.fields_semantic_type, | |
args.fields_vocabulary) | |
with open(args.output, 'w', encoding='utf-8') as output_handle: | |
peregrine_file = PeregrineOntologyFile(output_handle) | |
peregrine_file.write(ontology_name, concepts) | |
def read_sty_groups(filename): | |
print("Reading semantic groups.") | |
names = ['Group', 'Name', 'Type', 'Type_name'] | |
groups = (pd.read_csv(filename, sep='|', header=None, names=names) | |
.set_index('Type') | |
.Group) | |
return dict(groups) | |
def read_semantic_types(mrsty_filename): | |
sty_pickle_filename = '~MRSTY.RRF.pickle~' | |
try: | |
with open(sty_pickle_filename, 'rb') as f: | |
print("Reloading MRSTY.") | |
sty = pickle.load(f) | |
except FileNotFoundError: | |
with open_maybe_gz(mrsty_filename, 'rt') as f: | |
sty = load_semantic_types(f) | |
with open(sty_pickle_filename, 'xb') as f: | |
pickle.dump(sty, f) | |
return sty | |
def load_semantic_types(mrsty_handle): | |
res = defaultdict(set) | |
print("Reading MRSTY (.=100000 rows)", end=' ', flush=True) | |
for ix, line in enumerate(mrsty_handle): | |
if ix % 100000 == 0: | |
print('.', end='', flush=True) | |
values = line.split('|')[:-1] | |
row = mrsty_row(*values) | |
id = cui_to_id(row.cui) | |
res[id].add(row.tui) | |
print() | |
return res | |
def read_concepts(mrconso_handle, semantic_types, semantic_groups, | |
include_semantic_types, include_semantic_groups, | |
include_vocabularies, code_terms, fields_db_identifier, | |
fields_semantic_type, fields_vocabulary): | |
concepts = {} | |
print("Reading MRCONSO (.=100000 rows) ", end='', flush=True) | |
for ix, line in enumerate(mrconso_handle): | |
# if ix > 100000: | |
# break | |
if ix % 100000 == 0: | |
print('.', end='', flush=True) | |
mrconso_handle.flush() | |
values = line.split('|')[:-1] | |
row = mrconso_row(*values) | |
id = cui_to_id(row.cui) | |
# Check vocabularies | |
if not (include_vocabularies is None or row.sab in include_vocabularies): | |
continue | |
if id not in concepts: | |
if id not in semantic_types: | |
print('###', id) | |
sts = semantic_types.get(id, set()) | |
sgs = {semantic_groups[st] for st in sts} | |
# Check semantic types | |
if not (include_semantic_types is None or sts & include_semantic_types): | |
continue | |
# Check semantic groups | |
if not (include_semantic_groups is None or sgs & include_semantic_groups): | |
continue | |
umls_id = 'UMLS_' + row.cui | |
concepts[id] = (concept(id, *[None]*8) | |
._replace(name=row.str, terms=set(), | |
db_ids={umls_id}, vocabularies=set())) | |
if fields_semantic_type: | |
concepts[id] = concepts[id]._replace(semantic_types=sts) | |
# Set concept name to the first English, preferred term | |
if row.lat == 'ENG' and row.ispref == 'Y' and row.ts == 'P': | |
concepts[id] = concepts[id]._replace(name=row.str) | |
# Add terms to concept | |
if term_predicate(row): | |
term = row.str | |
info = term_information(row) | |
if info: | |
term += '\t@' + info | |
concepts[id].terms.add(term) | |
# Add code as terms to concept | |
if code_terms == 'code' or code_terms == 'code-voc-code': | |
concepts[id].terms.add(row.code) | |
if code_terms == 'voc-code' or code_terms == 'code-voc-code': | |
term = row.sab + ':' + row.code | |
concepts[id].terms.add(term) | |
# Add code as a database ID to the concept | |
if fields_db_identifier: | |
concepts[id].db_ids.add(row.sab + '_' + row.code) | |
# Add vocabulary | |
if fields_vocabulary: | |
concepts[id].vocabularies.add(row.sab) | |
print() | |
return list(concepts.values()) | |
class PeregrineOntologyFile: | |
# Generate Peregrine text file that is describe here: | |
# https://trac.nbic.nl/data-mining/wiki/ErasmusMC%20ontology%20file%20format | |
def __init__(self, out): | |
self.out = out | |
def write(self, ontology_name, concepts): | |
self.output_header(ontology_name) | |
vocabularies = set(voc for c in concepts if c.vocabularies for voc in c.vocabularies) | |
print("Write {} vocabularies.".format(len(vocabularies))) | |
for vocabulary in vocabularies: | |
self.output_namespace('Voc', id=vocabulary) | |
semantic_types = set(t for c in concepts if c.semantic_types for t in c.semantic_types) | |
print("Write {} semantic types.".format(len(semantic_types))) | |
for semantic_type in semantic_types: | |
self.output_namespace('SemType', semantic_type) | |
print("Write {} concepts (.=100000 concepts) ".format(len(concepts)), end='', flush=True) | |
for ix, concept in enumerate(concepts): | |
if ix % 100000 == 0: | |
print('.', end='', flush=True) | |
self.output_concept(concept) | |
print() | |
def output_header(self, name): | |
self.output_line("# ErasmusMC ontology file") | |
self.output_line("VR", "1.0") | |
self.output_line("ON", name) | |
self.output_line("--") | |
def output_namespace(self, namespace, id): | |
lines = [ | |
("NS", namespace), | |
("ID", id) | |
] | |
self.output_lines(lines) | |
def output_concept(self, concept): | |
lines = ([ | |
("ID", concept.id), | |
("NA", concept.name), | |
("TM", concept.terms), | |
("DF", concept.definition), | |
("DB", sorted(concept.db_ids)), | |
("ST", concept.semantic_types), | |
("VO", concept.vocabularies), | |
("DI", concept.disambiguation), | |
("PA", concept.parent_concepts), | |
]) | |
self.output_lines(lines) | |
def output_lines(self, lines): | |
for key, value in lines: | |
if value is None: | |
pass | |
elif type(value) in [list, set]: | |
for value0 in value: | |
self.output_line(key, value0) | |
else: | |
self.output_line(key, value) | |
self.output_line('--') | |
def output_line(self, key, value=None): | |
if value is None: | |
line = str(key) | |
else: | |
line = str(key) + ' ' + str(value) | |
self.out.write(line+'\n') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment