Skip to content

Instantly share code, notes, and snippets.

@glow-mdsol
Created October 3, 2013 09:13
Show Gist options
  • Save glow-mdsol/6807361 to your computer and use it in GitHub Desktop.
Save glow-mdsol/6807361 to your computer and use it in GitHub Desktop.
Matching script
#!/usr/bin/env python
import sys
import os
import yaml
import csv
import string
import re
import rdflib
import copy
import unicodecsv
import openpyxl
from openpyxl.style import Alignment
from openpyxl.cell import get_column_letter
import Levenshtein
from Levenshtein import jaro_winkler
from store import MySPARQLStore
MIN_LENGTH=4
MAX_MATCHES=1
class MatchError(Exception):
pass
TITLE = {'unresolved': ["url", "ProjectID", "project_name", "description", "subject_count"],
'resolved': ["url", "ProjectID", "project_name", "description", "subject_count", "trial_id",
"sponsor_name", "org_id", "sec_id", "title", "phase", "lead_sponsor", "linked_url", "enrollment"],
'partresolved': ["url", "ProjectID", "project_name", "description", "subject_count", "trial_id",
"sponsor_name", "org_id", "sec_id", "title", "phase", "enrollment", "lead_sponsor", "linked_url"],
'proposed': ["url", "ProjectID", "project_name", "description", "subject_count", "trial_id",
"sponsor_name", "org_id", "sec_id", "title", "phase", "enrollment", "lead_sponsor", "linked_url"
"url-jw", "desc-jw"],
}
class StudyFuzzer(object):
def __init__(self):
self.endpoint = self._get_endpoint()
self._graph = None
self._doc = None
def write(self, entity_type, line):
if self._doc is None:
# create the doc
self._doc = openpyxl.Workbook()
ws = self._doc.worksheets[0]
ws.title = entity_type
ws.append(TITLE.get(entity_type))
for _row in ws.range("A1:%s1" % get_column_letter(len(TITLE.get(entity_type)))):
for _cell in _row:
_cell.style.font.bold = True
_cell.style.alignment.horizontal = Alignment.HORIZONTAL_CENTER
if not entity_type in self._doc.get_sheet_names():
# create the sheet
ws = self._doc.create_sheet()
ws.title = entity_type
ws.append(TITLE.get(entity_type))
for _row in ws.range("A1:%s1" % get_column_letter(len(TITLE.get(entity_type)))):
for _cell in _row:
_cell.style.font.bold = True
_cell.style.alignment.horizontal = Alignment.HORIZONTAL_CENTER
else:
ws = self._doc.get_sheet_by_name(entity_type)
row = len(ws.row_dimensions)
for (idx, col) in enumerate(TITLE.get(entity_type)):
cell = ws.cell(row=row, column=idx)
cell.value = line.get(col)
if col in ['description', 'title', 'lead_sponsor', 'sponsor_name', 'linked_url']:
cell.style.alignment.wrap_text = True
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT
cell.style.alignment.vertical = Alignment.VERTICAL_TOP
ws.column_dimensions[get_column_letter(idx + 1)].width = 60
elif col in ['project_name', 'url']:
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT
cell.style.alignment.vertical = Alignment.VERTICAL_TOP
ws.column_dimensions[get_column_letter(idx + 1)].width = 30
elif col in ['org_id', 'sec_id']:
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT
cell.style.alignment.vertical = Alignment.VERTICAL_TOP
ws.column_dimensions[get_column_letter(idx + 1)].width = 15
else:
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT
cell.style.alignment.vertical = Alignment.VERTICAL_TOP
ws.column_dimensions[get_column_letter(idx + 1)].width = 12
def write_resolved(self, line):
if self._resolved is None:
self._resolved = unicodecsv.DictWriter(open('resolved.csv', 'w'),
encoding='utf-16',
fieldnames=line.keys())
self._resolved.writerow(dict([(x, x) for x in line.keys()]))
self._resolved.writerow(line)
def write_unresolved(self, line):
if self._unresolved is None:
self._unresolved = unicodecsv.DictWriter(open('unresolved.csv', 'w'),
encoding='utf-16',
fieldnames=line.keys())
self._unresolved.writerow(dict([(x, x) for x in line.keys()]))
self._unresolved.writerow(line)
def write_partresolved(self, line):
if self._partresolved is None:
self._partresolved = unicodecsv.DictWriter(open('partresolved.csv', 'w'),
encoding='utf-16',
fieldnames=line.keys())
self._partresolved.writerow(dict([(x, x) for x in line.keys()]))
self._partresolved.writerow(line)
@property
def graph(self):
if self._graph is None:
self._graph = self._init_graph(self.endpoint)
return self._graph
def _get_endpoint(self):
path_to_file = os.path.join(os.path.dirname(__file__), 'config', 'endpoints.yml')
if os.path.exists(path_to_file):
try:
config = yaml.load(open(path_to_file,'r'))
except yaml.YAMLError:
raise MatchError("No endpoint found: endpoint file parse failure")
return config.get('endpoint')
else:
raise MatchError("No endpoint found: no endpoint file")
def _init_graph(self, endpoint):
store = MySPARQLStore()
g = rdflib.ConjunctiveGraph(store)
g.open(endpoint)
g.namespace_manager.reset()
g.namespace_manager.bind('rdfs',
'http://www.w3.org/2000/01/rdf-schema#')
g.namespace_manager.bind('linkedct',
'http://data.linkedct.org/vocab/resource/')
g.namespace_manager.bind('rdf',
'http://www.w3.org/1999/02/22-rdf-syntax-ns#')
return g
def search_trial_name(self, trial_name):
return self.graph.query("""
SELECT DISTINCT ?query ?trial ?org_id ?sec_id ?title ?trial_id ?phase ?lead ?agency ?source ?enrollment
WHERE{
?trial a linkedct:trial ;
linkedct:id_info_org_study_id ?org_id ;
linkedct:sponsor_group ?group ;
linkedct:brief_title ?title ;
linkedct:phase ?phase ;
linkedct:source ?source ;
linkedct:trialid ?trial_id .
OPTIONAL {
?trial linkedct:id_info_secondary_id ?sec_id ;
linkedct:enrollment ?enrollment .
?group linkedct:sponsor_group_lead_sponsor ?lead .
?lead linkedct:sponsor_agency ?agency .
}
FILTER( REGEX(?org_id, ?query, 'i') || REGEX(?sec_id, ?query, 'i') )
}""", initBindings={'query': rdflib.Literal(trial_name)})
def build_word_list(self, to_match):
list_of_words = []
list_of_words.append(to_match)
# cleaned match
list_of_words.append(copy.copy(to_match).translate(string.maketrans("",""), string.punctuation))
length = len(to_match)
for i in range(1, length - MIN_LENGTH):
list_of_words.append(re.escape(to_match[i:]))
list_of_words.append(re.escape(to_match[:length-i]))
return list_of_words
def match(self, to_match):
matches = []
word_list = self.build_word_list(to_match)
for pattern in word_list:
results = [x for x in self.search_trial_name(pattern)]
if len(results) == 1:
# single match
return results[0]
for match in results:
if not match in matches:
matches.append(match)
else:
return matches
def impute_best_option(self, source, matches):
"""
Use the levenshtein module to try and disambiguate
"""
# first, try the URL vs the sponsor name
scored = {}
for match in matches:
sponsor = match.agency or match.source
url = jaro_winkler(unicode(source['url']), unicode(sponsor))
titles = jaro_winkler(unicode(source['description']), unicode(match.title))
c = source.copy()
c['trial_id'] = match.trial_id
c['enrollment'] = match.enrollment
c['linked_url'] = match.trial
c['lead_sponsor'] = match.lead
c['sponsor_name'] = match.agency or match.source
c['phase'] = match.phase
c['title'] = match.title
c['org_id'] = match.org_id
c['sec_id'] = match.sec_id
c["url-jw"] = url
c["desc-jw"] = titles
self.write('partresolved', c)
def process_file(self, filename):
# TODO: generate a graph
resolved = []
partlyresolved = []
unresolved = []
proposed = []
if os.path.exists(filename):
with open(filename, 'r') as csvfile:
dr = csv.DictReader(csvfile, delimiter='\t')
for line in dr:
try:
matched = self.match(line['project_name'])
if isinstance(matched, list):
print "Multiple matches for %s" % line['project_name']
for match in matched:
self.impute_best_option(line, matched)
else:
c = line.copy()
c['trial_id'] = matched.trial_id
c['enrollment'] = matched.enrollment
c['linked_url'] = matched.trial
c['lead_sponsor'] = matched.lead
c['phase'] = matched.phase
c['sponsor_name'] = matched.agency or matched.source
c['title'] = matched.title
c['org_id'] = matched.org_id
c['sec_id'] = matched.sec_id
resolved.append(c)
self.write('resolved', c)
except MatchError:
print "Couldn't match %s" % line['project_name']
self.write('unresolved', line)
unresolved.append(line)
self._doc.save("map.xlsx")
print "Loaded records"
print "Resolved: %s" % len(resolved)
print "Unable to resolve: %s" % len(unresolved)
print "Number to adjudicate: %s" % len(partlyresolved)
if __name__ == "__main__":
sf = StudyFuzzer()
sf.process_file(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment