Skip to content

Instantly share code, notes, and snippets.

@benozol
Last active August 11, 2016 13:22
Show Gist options
  • Save benozol/7560c5135ec1309a3ae081e93f6e99f8 to your computer and use it in GitHub Desktop.
Save benozol/7560c5135ec1309a3ae081e93f6e99f8 to your computer and use it in GitHub Desktop.
Convert UMLS Metathesaurus files to a Peregrine ontology
#!/usr/bin/env python3
# (C) 2016 Benedikt Becker, Erasmus MC, Rotterdam
from collections import namedtuple, defaultdict
import argparse
import pickle
import gzip
import pandas as pd
import sys
import shlex
# A MRCONSO row
mrconso_row = namedtuple('mrconso_row', 'cui lat ts lui stt sui ispref aui saui scui sdui sab tty code str srl suppress cvf')
# A MRSTY row
mrsty_row = namedtuple('mrsty_row', 'cui tui stn sty atui cvf')
# A Peregrine concept
concept = namedtuple('concept', 'id terms db_ids name definition semantic_types vocabularies disambiguation parent_concepts')
def term_predicate(row):
# Include only terms from the Metathesaurus that correspond to this predicate
return (row.lat == 'ENG'
and row.ts in 'PS' # Not suppressible
and len(row.str) > 2)
def term_information(row):
# Generate term information for the term in row (a mrconso_row)
# https://trac.nbic.nl/data-mining/wiki/ErasmusMC%20ontology%20file%20format
# [lang=XX];[match=[ci],[no]]
return 'match=no'
def cui_to_id(cui):
return int(cui[1:])
def open_maybe_gz(name, *args, **kwargs):
if name.endswith('.gz'):
return gzip.open(name, *args, **kwargs)
else:
return open(name, *args, **kwargs)
def main():
description="Convert the UMLS Metathesaurus to a Peregrine ontology."
epilog = """For example:
$ {program} \\
--umls-version 2015AB \\
--mrconso 2015AB/2015AB/META/MRCONSO.RRF.gz \\
--mrsty 2015AB/2015AB/META/MRSTY.RRF.gz \\
--sty-groups SemGroups.txt \\
--vocabularies MSH MDR SNM LNC ICD9CM ICD10 ICD10CM ICPC2EENG ICPC2P MDR MSH RCD RCD2 \\
--semantic-groups ANAT CHEM DISO GENE LIVB PHEN PHYS PROC \\
--output UMLS2015AB.ontology
The semantic groups can be found at the following URL:
https://semanticnetwork.nlm.nih.gov/download/SemGroups.txt
The file MRCONSO.RRF.gz can be created by
$ zcat MRCONSO.RRF.aa.gz MRCONSO.RRF.ab.gz | gzip - > MRCONSO.RRF.gz
""".format(program=sys.argv[0])
parser = argparse.ArgumentParser(description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('--umls-version', metavar='VERSION', required=True,
help='UMLS version, used to generate the ontology name')
parser.add_argument('--mrconso', metavar='FILE', required=True,
help='Zipped MRCONSO file')
parser.add_argument('--mrsty', metavar='FILE', required=True,
help='Zipped MRSTY file')
parser.add_argument('--sty-groups', metavar='FILE', required=True,
help='Semantic groups file')
parser.add_argument('--output', metavar='FILE', required=True,
help='Peregrine output file')
parser.add_argument('--semantic-types', metavar='SEMANTIC-TYPE', nargs='*',
help='If given, include only UMLS concepts of these semantic types')
parser.add_argument('--semantic-groups', metavar='SEMANTIC-GROUP', nargs='*',
help='If given, include only UMLS concepts of these semantic groups')
parser.add_argument('--vocabularies', metavar='VOCABULARY', nargs='*',
help='If given, include only terms of these vocabularies')
parser.add_argument('--code-terms', choices=['code', 'voc-code', 'code-voc-code'],
help='Use codes as terms (for example: TM D015074 if code or code-voc-code, TM MSH:D015074 if voc-code or code-voc-code)')
parser.add_argument('--fields-db-identifier', action='store_true',
help='Write fields for database identifier')
parser.add_argument('--fields-semantic-type', action='store_true',
help='Write fields for database identifier')
parser.add_argument('--fields-vocabulary', action='store_true',
help='Write fields for vocabulary')
args = parser.parse_args()
ontology_name = ' '.join([shlex.quote(arg) for arg in sys.argv])
if args.semantic_types:
args.semantic_types = set(args.semantic_types)
if args.semantic_groups:
args.semantic_groups = set(args.semantic_groups)
if args.vocabularies:
args.vocabularies = set(args.vocabularies)
semantic_groups = read_sty_groups(args.sty_groups)
semantic_types = read_semantic_types(args.mrsty)
with open_maybe_gz(args.mrconso, 'rt', encoding='utf-8') as mrconso_handle:
concepts = read_concepts(mrconso_handle, semantic_types,
semantic_groups, args.semantic_types,
args.semantic_groups, args.vocabularies,
args.code_terms, args.fields_db_identifier,
args.fields_semantic_type,
args.fields_vocabulary)
with open(args.output, 'w', encoding='utf-8') as output_handle:
peregrine_file = PeregrineOntologyFile(output_handle)
peregrine_file.write(ontology_name, concepts)
def read_sty_groups(filename):
print("Reading semantic groups.")
names = ['Group', 'Name', 'Type', 'Type_name']
groups = (pd.read_csv(filename, sep='|', header=None, names=names)
.set_index('Type')
.Group)
return dict(groups)
def read_semantic_types(mrsty_filename):
sty_pickle_filename = '~MRSTY.RRF.pickle~'
try:
with open(sty_pickle_filename, 'rb') as f:
print("Reloading MRSTY.")
sty = pickle.load(f)
except FileNotFoundError:
with open_maybe_gz(mrsty_filename, 'rt') as f:
sty = load_semantic_types(f)
with open(sty_pickle_filename, 'xb') as f:
pickle.dump(sty, f)
return sty
def load_semantic_types(mrsty_handle):
res = defaultdict(set)
print("Reading MRSTY (.=100000 rows)", end=' ', flush=True)
for ix, line in enumerate(mrsty_handle):
if ix % 100000 == 0:
print('.', end='', flush=True)
values = line.split('|')[:-1]
row = mrsty_row(*values)
id = cui_to_id(row.cui)
res[id].add(row.tui)
print()
return res
def read_concepts(mrconso_handle, semantic_types, semantic_groups,
include_semantic_types, include_semantic_groups,
include_vocabularies, code_terms, fields_db_identifier,
fields_semantic_type, fields_vocabulary):
concepts = {}
print("Reading MRCONSO (.=100000 rows) ", end='', flush=True)
for ix, line in enumerate(mrconso_handle):
# if ix > 100000:
# break
if ix % 100000 == 0:
print('.', end='', flush=True)
mrconso_handle.flush()
values = line.split('|')[:-1]
row = mrconso_row(*values)
id = cui_to_id(row.cui)
# Check vocabularies
if not (include_vocabularies is None or row.sab in include_vocabularies):
continue
if id not in concepts:
if id not in semantic_types:
print('###', id)
sts = semantic_types.get(id, set())
sgs = {semantic_groups[st] for st in sts}
# Check semantic types
if not (include_semantic_types is None or sts & include_semantic_types):
continue
# Check semantic groups
if not (include_semantic_groups is None or sgs & include_semantic_groups):
continue
umls_id = 'UMLS_' + row.cui
concepts[id] = (concept(id, *[None]*8)
._replace(name=row.str, terms=set(),
db_ids={umls_id}, vocabularies=set()))
if fields_semantic_type:
concepts[id] = concepts[id]._replace(semantic_types=sts)
# Set concept name to the first English, preferred term
if row.lat == 'ENG' and row.ispref == 'Y' and row.ts == 'P':
concepts[id] = concepts[id]._replace(name=row.str)
# Add terms to concept
if term_predicate(row):
term = row.str
info = term_information(row)
if info:
term += '\t@' + info
concepts[id].terms.add(term)
# Add code as terms to concept
if code_terms == 'code' or code_terms == 'code-voc-code':
concepts[id].terms.add(row.code)
if code_terms == 'voc-code' or code_terms == 'code-voc-code':
term = row.sab + ':' + row.code
concepts[id].terms.add(term)
# Add code as a database ID to the concept
if fields_db_identifier:
concepts[id].db_ids.add(row.sab + '_' + row.code)
# Add vocabulary
if fields_vocabulary:
concepts[id].vocabularies.add(row.sab)
print()
return list(concepts.values())
class PeregrineOntologyFile:
# Generate Peregrine text file that is describe here:
# https://trac.nbic.nl/data-mining/wiki/ErasmusMC%20ontology%20file%20format
def __init__(self, out):
self.out = out
def write(self, ontology_name, concepts):
self.output_header(ontology_name)
vocabularies = set(voc for c in concepts if c.vocabularies for voc in c.vocabularies)
print("Write {} vocabularies.".format(len(vocabularies)))
for vocabulary in vocabularies:
self.output_namespace('Voc', id=vocabulary)
semantic_types = set(t for c in concepts if c.semantic_types for t in c.semantic_types)
print("Write {} semantic types.".format(len(semantic_types)))
for semantic_type in semantic_types:
self.output_namespace('SemType', semantic_type)
print("Write {} concepts (.=100000 concepts) ".format(len(concepts)), end='', flush=True)
for ix, concept in enumerate(concepts):
if ix % 100000 == 0:
print('.', end='', flush=True)
self.output_concept(concept)
print()
def output_header(self, name):
self.output_line("# ErasmusMC ontology file")
self.output_line("VR", "1.0")
self.output_line("ON", name)
self.output_line("--")
def output_namespace(self, namespace, id):
lines = [
("NS", namespace),
("ID", id)
]
self.output_lines(lines)
def output_concept(self, concept):
lines = ([
("ID", concept.id),
("NA", concept.name),
("TM", concept.terms),
("DF", concept.definition),
("DB", sorted(concept.db_ids)),
("ST", concept.semantic_types),
("VO", concept.vocabularies),
("DI", concept.disambiguation),
("PA", concept.parent_concepts),
])
self.output_lines(lines)
def output_lines(self, lines):
for key, value in lines:
if value is None:
pass
elif type(value) in [list, set]:
for value0 in value:
self.output_line(key, value0)
else:
self.output_line(key, value)
self.output_line('--')
def output_line(self, key, value=None):
if value is None:
line = str(key)
else:
line = str(key) + ' ' + str(value)
self.out.write(line+'\n')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment