Created
May 23, 2014 15:31
-
-
Save timothyclemans/de819e14f2c0e0ec9b81 to your computer and use it in GitHub Desktop.
snomedct
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# PyMedTermino | |
# Copyright (C) 2012-2013 Jean-Baptiste LAMY | |
# LIMICS (Laboratoire d'informatique médicale et d'ingénierie des connaissances en santé), UMR_S 1142 | |
# University Paris 13, Sorbonne paris-Cité, Bobigny, France | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU Lesser General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU Lesser General Public License for more details. | |
# You should have received a copy of the GNU Lesser General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
""" | |
pymedtermino.snomedct | |
********************* | |
PyMedtermino module for SNOMEDCT. | |
.. class:: SNOMEDCT | |
The SNOMED CT terminology. See :class:`pymedtermino.Terminology` for common terminology members; only SNOMED CT-specific members are described here. | |
.. automethod:: CORE_problem_list | |
""" | |
__all__ = ["SNOMEDCT", "Group"] | |
import sys, os, os.path, sqlite3 as sql_module | |
import pymedtermino | |
class SNOMEDCT(pymedtermino.Terminology): | |
def __init__(self): | |
pymedtermino.Terminology.__init__(self, "SNOMEDCT") | |
def _create_Concept(self): return SNOMEDCTConcept | |
def first_levels(self): | |
return [self[code] for code in [u"123037004", u"404684003", u"308916002", u"272379006", u"106237007", u"363787002", u"410607006", u"373873005", u"78621006", u"260787004", u"71388002", u"362981000", u"419891008", u"243796009", u"48176007", u"370115009", u"123038009", u"254291000", u"105590001"]] | |
def search(self, text): | |
#db_cursor.execute("SELECT DISTINCT conceptId FROM Description WHERE term LIKE ?", ("%%%s%%" % text,)) | |
db = sql_module.connect(os.path.join(pymedtermino.DATA_DIR, "snomedct.sqlite3")) | |
db_cursor = db.cursor() | |
db_cursor.execute("PRAGMA query_only = TRUE;") | |
db_cursor.execute("PRAGMA synchronous = OFF;") | |
db_cursor.execute("PRAGMA journal_mode = OFF;") | |
db_cursor.execute("SELECT DISTINCT Description.conceptId FROM Description, Description_fts WHERE (Description_fts.term MATCH ?) AND (Description.id = Description_fts.docid)", (text,)) | |
r = db_cursor.fetchall() | |
db.close() | |
l = [] | |
for (code,) in r: | |
try: l.append(self[code]) | |
except ValueError: pass | |
return l | |
def CORE_problem_list(self): | |
"""Returns a generator iterating over all SNOMED CT concepts that are included in the CORE problem list.""" | |
pymedtermino.snomedct.db_cursor.execute("SELECT Id FROM Concept WHERE is_in_core = 1") | |
for (code,) in pymedtermino.snomedct.db_cursor.fetchall(): | |
yield self[code] | |
class Group(object): | |
"""A group, grouping several SNOMED CT relation together in the definition of a Concept.""" | |
def __init__(self): self.relations = set() | |
def add_relation(self, rel, concept): | |
if isinstance(concept, pymedtermino.Concept): | |
if rel in self.relations: | |
getattr(self, rel).add(concept) | |
else: | |
self.relations.add(rel) | |
setattr(self, rel, pymedtermino.Concepts([concept])) | |
else: | |
if rel in self.relations: | |
getattr(self, rel).update(concept) | |
else: | |
self.relations.add(rel) | |
setattr(self, rel, pymedtermino.Concepts(concept)) | |
def imply(a, b): | |
if not b.relations.issubset(a.relations): return 0 | |
for relation in b.relations: | |
if not getattr(a, relation).imply(getattr(b, relation)): return 0 | |
return 1 | |
def __repr__(self): | |
return "<Group %s>" % "; ".join(["%s %s" % (rel, ", ".join([concept.term for concept in getattr(self, rel)])) for rel in self.relations]) | |
if pymedtermino.REMOVE_SUPPRESSED_CONCEPTS: _parent_class = pymedtermino.MultiaxialConcept | |
else: _parent_class = pymedtermino.CycleSafeMultiaxialConcept | |
class SNOMEDCTConcept(_parent_class, pymedtermino._IntCodeConcept): | |
"""A SNOMED CT concept. See :class:`pymedtermino.Concept` for common terminology members; only SNOMED CT-specific members are described here. | |
.. attribute:: is_in_core | |
True if this concept belongs to the SNOMED CT CORE problem list. | |
.. attribute:: groups | |
A list of the groups in the concept's definition. Each group include one or more relations, grouped together. | |
.. attribute:: out_of_group | |
A special group that includes all relations that do *not* belongs to a group. | |
.. attribute:: active | |
True if this concept is still active in SNOMED CT; False if it has been removed. | |
.. attribute:: definition_status | |
.. attribute:: module | |
Additional attributes are available for relations, and are listed in the :attr:`relations <pymedtermino.Concept.relations>` attribute. | |
""" | |
def __init__(self, code): | |
db = sql_module.connect(os.path.join(pymedtermino.DATA_DIR, "snomedct.sqlite3")) | |
db_cursor = db.cursor() | |
db_cursor.execute("PRAGMA query_only = TRUE;") | |
db_cursor.execute("PRAGMA synchronous = OFF;") | |
db_cursor.execute("PRAGMA journal_mode = OFF;") | |
if pymedtermino.REMOVE_SUPPRESSED_CONCEPTS: | |
db_cursor.execute("SELECT active FROM Concept WHERE id=?", (code,)) | |
if db_cursor.fetchone()[0] != 1: raise ValueError() | |
db_cursor.execute("SELECT term FROM Description WHERE conceptId=? AND typeId=900000000000003001 AND active=1", (code,)) | |
self.active = 1 | |
else: | |
db_cursor.execute("SELECT term FROM Description WHERE conceptId=? AND typeId=900000000000003001", (code,)) | |
r = db_cursor.fetchone() | |
db.close() | |
if not r: raise ValueError() | |
pymedtermino.MultiaxialConcept.__init__(self, code, r[0]) | |
def __getattr__(self, attr): | |
db = sql_module.connect(os.path.join(pymedtermino.DATA_DIR, "snomedct.sqlite3")) | |
db_cursor = db.cursor() | |
db_cursor.execute("PRAGMA query_only = TRUE;") | |
db_cursor.execute("PRAGMA synchronous = OFF;") | |
db_cursor.execute("PRAGMA journal_mode = OFF;") | |
if attr == "parents": | |
if pymedtermino.REMOVE_SUPPRESSED_RELATIONS and self.active: | |
db_cursor.execute("SELECT DISTINCT destinationId FROM Relationship WHERE sourceId=? AND typeId=116680003 AND active=1", (self.code,)) # 116680003 = is_a | |
else: | |
db_cursor.execute("SELECT DISTINCT destinationId FROM Relationship WHERE sourceId=? AND typeId=116680003", (self.code,)) # 116680003 = is_a | |
self.parents = [self.terminology[code] for (code,) in db_cursor.fetchall()] | |
db.close() | |
return self.parents | |
elif attr == "children": | |
if pymedtermino.REMOVE_SUPPRESSED_RELATIONS and self.active: | |
db_cursor.execute("SELECT DISTINCT sourceId FROM Relationship WHERE destinationId=? AND typeId=116680003 AND active=1", (self.code,)) # 116680003 = is_a | |
else: | |
db_cursor.execute("SELECT DISTINCT sourceId FROM Relationship WHERE destinationId=? AND typeId=116680003", (self.code,)) # 116680003 = is_a | |
self.children = [self.terminology[code] for (code,) in db_cursor.fetchall()] | |
db.close() | |
return self.children | |
elif attr == "relations": | |
if pymedtermino.REMOVE_SUPPRESSED_RELATIONS and self.active: | |
db_cursor.execute("SELECT DISTINCT typeId FROM Relationship WHERE sourceId=? AND active=1", (self.code,)) | |
else: | |
db_cursor.execute("SELECT DISTINCT typeId FROM Relationship WHERE sourceId=?", (self.code,)) | |
self.relations = set(code_2_relation[rel] for (rel,) in db_cursor.fetchall() if rel != 116680003) # 116680003 = is_a | |
if pymedtermino.REMOVE_SUPPRESSED_RELATIONS and self.active: | |
db_cursor.execute("SELECT DISTINCT typeId FROM Relationship WHERE destinationId=? AND active=1", (self.code,)) | |
else: | |
db_cursor.execute("SELECT DISTINCT typeId FROM Relationship WHERE destinationId=?", (self.code,)) | |
for (rel,) in db_cursor.fetchall(): | |
if rel != 116680003: | |
self.relations.add("INVERSE_%s" % code_2_relation[rel]) | |
db.close() | |
return self.relations | |
elif (attr == "groups") or (attr == "out_of_group"): | |
if pymedtermino.REMOVE_SUPPRESSED_RELATIONS and self.active: | |
db_cursor.execute("SELECT DISTINCT relationshipGroup, typeId, destinationId FROM Relationship WHERE sourceId=? AND active=1", (self.code,)) | |
else: | |
db_cursor.execute("SELECT DISTINCT relationshipGroup, typeId, destinationId FROM Relationship WHERE sourceId=?", (self.code,)) | |
data = db_cursor.fetchall() | |
groups = [Group() for i in range(1 + max([0] + [group_id for (group_id, rel, code) in data]))] | |
for (group_id, rel, code) in data: | |
if rel == 116680003: continue # 116680003 = is_a | |
group = groups[group_id] | |
group.add_relation(code_2_relation[rel], self.terminology[code]) | |
self.groups = [group for group in groups[1:] if group.relations] # Pourquoi y a-t-il des groupes vides dans la SNOMED CT ??? | |
self.out_of_group = groups[0] | |
if attr == "groups": return self.groups | |
else: return self.out_of_group | |
elif attr in relation_2_code: | |
relation_code = relation_2_code[attr] | |
if pymedtermino.REMOVE_SUPPRESSED_RELATIONS and self.active: | |
db_cursor.execute("SELECT DISTINCT destinationId FROM Relationship WHERE sourceId=? AND typeId=? AND active=1", (self.code, relation_code)) | |
else: | |
db_cursor.execute("SELECT DISTINCT destinationId FROM Relationship WHERE sourceId=? AND typeId=?", (self.code, relation_code)) | |
l = [self.terminology[code] for (code,) in db_cursor.fetchall()] | |
setattr(self, attr, l) | |
db.close() | |
return l | |
elif attr.startswith(u"INVERSE_"): | |
relation_code = relation_2_code[attr[8:]] | |
if pymedtermino.REMOVE_SUPPRESSED_RELATIONS and self.active: | |
db_cursor.execute("SELECT DISTINCT sourceId FROM Relationship WHERE destinationId=? AND typeId=? AND active=1", (self.code, relation_code)) | |
else: | |
db_cursor.execute("SELECT DISTINCT sourceId FROM Relationship WHERE destinationId=? AND typeId=?", (self.code, relation_code)) | |
l = [self.terminology[code] for (code,) in db_cursor.fetchall()] | |
setattr(self, attr, l) | |
db.close() | |
return l | |
elif attr == "active": | |
db_cursor.execute("SELECT active FROM Concept WHERE id=?", (self.code,)) | |
self.active = db_cursor.fetchone()[0] | |
db.close() | |
return self.active | |
elif attr == "terms": | |
if pymedtermino.REMOVE_SUPPRESSED_TERMS and self.active: | |
db_cursor.execute("SELECT term FROM Description WHERE conceptId=? AND active=1", (self.code,)) | |
else: | |
db_cursor.execute("SELECT term FROM Description WHERE conceptId=?", (self.code,)) | |
self.terms = [l[0] for l in db_cursor.fetchall()] | |
db.close() | |
return self.terms | |
elif attr == "definition_status": | |
db_cursor.execute("SELECT definitionStatusId FROM Concept WHERE id=?", (self.code,)) | |
self.definition_status = SNOMEDCT[db_cursor.fetchone()[0]] | |
db.close() | |
return self.definition_status | |
elif attr == "module": | |
db_cursor.execute("SELECT moduleId FROM Concept WHERE id=?", (self.code,)) | |
self.module = SNOMEDCT[db_cursor.fetchone()[0]] | |
db.close() | |
return self.module | |
elif attr == "is_in_core": | |
db_cursor.execute("SELECT is_in_core FROM Concept WHERE id=?", (self.code,)) | |
self.is_in_core = int(db_cursor.fetchone()[0]) | |
db.close() | |
return self.is_in_core | |
raise AttributeError(attr) | |
def is_part_of(self, concept, already = None): | |
if self is concept: return True | |
if already is None: already = set([self]) | |
if u"part_of" in self.relations: parents = set(self.parents + self.part_of) | |
else: parents = self.parents | |
for parent in parents: | |
if not parent in already: | |
already.add(parent) | |
if parent.is_part_of(concept, already): return True | |
return False | |
def descendant_parts(self): | |
"""Returns the sub-parts of this concept, recursively.""" | |
if u"INVERSE_part_of" in self.relations: | |
for child in self.INVERSE_part_of: | |
yield child | |
for concept in child.descendant_parts(): | |
yield concept | |
def ancestor_parts(self): | |
"""Returns the super-part of this concept, recursively.""" | |
if u"part_of" in self.relations: | |
for parent in self.part_of: | |
yield parent | |
for concept in parent.ancestor_parts(): | |
yield concept | |
def associated_clinical_findings(self): | |
"""Return the clinical finding concepts associated to this concept (which is expected to be a anatomical structure, a morphology, etc).""" | |
r = pymedtermino.Concepts() | |
for i in set(self.self_and_descendants()) | set(self.descendant_parts()): | |
for relation in MAIN_CLINICAL_FINDING_RELATIONS: | |
relation = "INVERSE_%s" % relation | |
if relation in i.relations: | |
for j in getattr(i, relation): | |
if j.is_a(SNOMEDCT[404684003]): r.add(j) | |
return r | |
SNOMEDCT = SNOMEDCT() | |
MAIN_CLINICAL_FINDING_RELATIONS = set([ | |
"part_of", | |
"temporal_context", | |
"finding_site", | |
"associated_morphology", | |
"has_interpretation", | |
"interprets", | |
"has_definitional_manifestation", | |
"pathological_process", | |
"has_focus", | |
"causative_agent", | |
"associated_with", # Discutable... | |
"due_to", | |
]) | |
code_2_relation = { | |
131195008 : u"subject_of_information", | |
42752001 : u"due_to", | |
309824003 : u"instrumentation", | |
246112005 : u"severity", | |
367346004 : u"measures", | |
363589002 : u"associated_procedure", | |
272741003 : u"laterality", | |
363701004 : u"direct_substance", | |
405813007 : u"procedure_site_direct", | |
370133003 : u"specimen_substance", | |
370134009 : u"time_aspect", | |
408730004 : u"procedure_context", | |
260858005 : u"extent", | |
419066007 : u"finding_informer", | |
263535000 : u"communication_with_wound", | |
425391005 : u"using_access_device", | |
370127007 : u"access_instrument", | |
118170007 : u"specimen_source_identity", | |
363700003 : u"direct_morphology", | |
370132008 : u"scale_type", | |
246100006 : u"onset", | |
363713009 : u"has_interpretation", | |
116676008 : u"associated_morphology", | |
116683001 : u"associated_function", | |
118168003 : u"specimen_source_morphology", | |
308489006 : u"pathological_process", | |
424361007 : u"using_substance", | |
363705008 : u"has_definitional_manifestation", | |
408729009 : u"finding_context", | |
260686004 : u"method", | |
263502005 : u"clinical_course", | |
246267002 : u"location", | |
363710007 : u"indirect_device", | |
116686009 : u"has_specimen", | |
363715002 : u"associated_etiologic_finding", | |
261583007 : u"using", | |
363699004 : u"direct_device", | |
363709002 : u"indirect_morphology", | |
246456000 : u"episodicity", | |
260870009 : u"priority", | |
116680003 : u"is_a", | |
405816004 : u"procedure_morphology", | |
363704007 : u"procedure_site", | |
123005000 : u"part_of", | |
246093002 : u"component", | |
260669005 : u"approach", | |
370130000 : u"property", | |
408731000 : u"temporal_context", | |
255234002 : u"after", | |
363714003 : u"interprets", | |
424226004 : u"using_device", | |
363698007 : u"finding_site", | |
405815000 : u"procedure_device", | |
363703001 : u"has_intent", | |
47429007 : u"associated_with", | |
246090004 : u"associated_finding", | |
370135005 : u"pathological_process", | |
424876005 : u"surgical_approach", | |
418775008 : u"finding_method", | |
411116001 : u"has_dose_form", | |
260908002 : u"course", | |
363708005 : u"temporally_follows", | |
408732007 : u"subject_relationship_context", | |
127489000 : u"has_active_ingredient", | |
258214002 : u"stage", | |
424244007 : u"using_energy", | |
410675002 : u"route_of_administration", | |
370129005 : u"measurement_method", | |
246513007 : u"revision_status", | |
405814001 : u"procedure_site_indirect", | |
246454002 : u"occurrence", | |
118171006 : u"specimen_procedure", | |
363702006 : u"has_focus", | |
260507000 : u"access", | |
116678009 : u"has_measured_component", | |
370131001 : u"recipient_category", | |
246075003 : u"causative_agent", | |
118169006 : u"specimen_source_topography", | |
} | |
relation_2_code = dict((relation, code) for (code, relation) in code_2_relation.items()) | |
#if __name__ == "__main__": | |
# relation_codes = {} | |
# db_cursor.execute("select distinct typeId from Relationship") | |
# codes = db_cursor.fetchall() | |
# for code, in codes: | |
# db_cursor.execute("select term from Description where conceptId=? and typeId=900000000000013009", (code,)) | |
# relation_codes[code] = db_cursor.fetchone()[0].lower().replace(" ", "_").replace("_-_", "_") | |
# print "code_2_relation = {" | |
# for code in relation_codes: print ''' %s : u"%s",''' % (code, relation_codes[code]) | |
# print "}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment