Created
October 3, 2013 09:13
-
-
Save glow-mdsol/6807361 to your computer and use it in GitHub Desktop.
Matching script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import os | |
import yaml | |
import csv | |
import string | |
import re | |
import rdflib | |
import copy | |
import unicodecsv | |
import openpyxl | |
from openpyxl.style import Alignment | |
from openpyxl.cell import get_column_letter | |
import Levenshtein | |
from Levenshtein import jaro_winkler | |
from store import MySPARQLStore | |
MIN_LENGTH=4 | |
MAX_MATCHES=1 | |
class MatchError(Exception): | |
pass | |
TITLE = {'unresolved': ["url", "ProjectID", "project_name", "description", "subject_count"], | |
'resolved': ["url", "ProjectID", "project_name", "description", "subject_count", "trial_id", | |
"sponsor_name", "org_id", "sec_id", "title", "phase", "lead_sponsor", "linked_url", "enrollment"], | |
'partresolved': ["url", "ProjectID", "project_name", "description", "subject_count", "trial_id", | |
"sponsor_name", "org_id", "sec_id", "title", "phase", "enrollment", "lead_sponsor", "linked_url"], | |
'proposed': ["url", "ProjectID", "project_name", "description", "subject_count", "trial_id", | |
"sponsor_name", "org_id", "sec_id", "title", "phase", "enrollment", "lead_sponsor", "linked_url" | |
"url-jw", "desc-jw"], | |
} | |
class StudyFuzzer(object): | |
def __init__(self): | |
self.endpoint = self._get_endpoint() | |
self._graph = None | |
self._doc = None | |
def write(self, entity_type, line): | |
if self._doc is None: | |
# create the doc | |
self._doc = openpyxl.Workbook() | |
ws = self._doc.worksheets[0] | |
ws.title = entity_type | |
ws.append(TITLE.get(entity_type)) | |
for _row in ws.range("A1:%s1" % get_column_letter(len(TITLE.get(entity_type)))): | |
for _cell in _row: | |
_cell.style.font.bold = True | |
_cell.style.alignment.horizontal = Alignment.HORIZONTAL_CENTER | |
if not entity_type in self._doc.get_sheet_names(): | |
# create the sheet | |
ws = self._doc.create_sheet() | |
ws.title = entity_type | |
ws.append(TITLE.get(entity_type)) | |
for _row in ws.range("A1:%s1" % get_column_letter(len(TITLE.get(entity_type)))): | |
for _cell in _row: | |
_cell.style.font.bold = True | |
_cell.style.alignment.horizontal = Alignment.HORIZONTAL_CENTER | |
else: | |
ws = self._doc.get_sheet_by_name(entity_type) | |
row = len(ws.row_dimensions) | |
for (idx, col) in enumerate(TITLE.get(entity_type)): | |
cell = ws.cell(row=row, column=idx) | |
cell.value = line.get(col) | |
if col in ['description', 'title', 'lead_sponsor', 'sponsor_name', 'linked_url']: | |
cell.style.alignment.wrap_text = True | |
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT | |
cell.style.alignment.vertical = Alignment.VERTICAL_TOP | |
ws.column_dimensions[get_column_letter(idx + 1)].width = 60 | |
elif col in ['project_name', 'url']: | |
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT | |
cell.style.alignment.vertical = Alignment.VERTICAL_TOP | |
ws.column_dimensions[get_column_letter(idx + 1)].width = 30 | |
elif col in ['org_id', 'sec_id']: | |
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT | |
cell.style.alignment.vertical = Alignment.VERTICAL_TOP | |
ws.column_dimensions[get_column_letter(idx + 1)].width = 15 | |
else: | |
cell.style.alignment.horizontal = Alignment.HORIZONTAL_LEFT | |
cell.style.alignment.vertical = Alignment.VERTICAL_TOP | |
ws.column_dimensions[get_column_letter(idx + 1)].width = 12 | |
def write_resolved(self, line): | |
if self._resolved is None: | |
self._resolved = unicodecsv.DictWriter(open('resolved.csv', 'w'), | |
encoding='utf-16', | |
fieldnames=line.keys()) | |
self._resolved.writerow(dict([(x, x) for x in line.keys()])) | |
self._resolved.writerow(line) | |
def write_unresolved(self, line): | |
if self._unresolved is None: | |
self._unresolved = unicodecsv.DictWriter(open('unresolved.csv', 'w'), | |
encoding='utf-16', | |
fieldnames=line.keys()) | |
self._unresolved.writerow(dict([(x, x) for x in line.keys()])) | |
self._unresolved.writerow(line) | |
def write_partresolved(self, line): | |
if self._partresolved is None: | |
self._partresolved = unicodecsv.DictWriter(open('partresolved.csv', 'w'), | |
encoding='utf-16', | |
fieldnames=line.keys()) | |
self._partresolved.writerow(dict([(x, x) for x in line.keys()])) | |
self._partresolved.writerow(line) | |
@property | |
def graph(self): | |
if self._graph is None: | |
self._graph = self._init_graph(self.endpoint) | |
return self._graph | |
def _get_endpoint(self): | |
path_to_file = os.path.join(os.path.dirname(__file__), 'config', 'endpoints.yml') | |
if os.path.exists(path_to_file): | |
try: | |
config = yaml.load(open(path_to_file,'r')) | |
except yaml.YAMLError: | |
raise MatchError("No endpoint found: endpoint file parse failure") | |
return config.get('endpoint') | |
else: | |
raise MatchError("No endpoint found: no endpoint file") | |
def _init_graph(self, endpoint): | |
store = MySPARQLStore() | |
g = rdflib.ConjunctiveGraph(store) | |
g.open(endpoint) | |
g.namespace_manager.reset() | |
g.namespace_manager.bind('rdfs', | |
'http://www.w3.org/2000/01/rdf-schema#') | |
g.namespace_manager.bind('linkedct', | |
'http://data.linkedct.org/vocab/resource/') | |
g.namespace_manager.bind('rdf', | |
'http://www.w3.org/1999/02/22-rdf-syntax-ns#') | |
return g | |
def search_trial_name(self, trial_name): | |
return self.graph.query(""" | |
SELECT DISTINCT ?query ?trial ?org_id ?sec_id ?title ?trial_id ?phase ?lead ?agency ?source ?enrollment | |
WHERE{ | |
?trial a linkedct:trial ; | |
linkedct:id_info_org_study_id ?org_id ; | |
linkedct:sponsor_group ?group ; | |
linkedct:brief_title ?title ; | |
linkedct:phase ?phase ; | |
linkedct:source ?source ; | |
linkedct:trialid ?trial_id . | |
OPTIONAL { | |
?trial linkedct:id_info_secondary_id ?sec_id ; | |
linkedct:enrollment ?enrollment . | |
?group linkedct:sponsor_group_lead_sponsor ?lead . | |
?lead linkedct:sponsor_agency ?agency . | |
} | |
FILTER( REGEX(?org_id, ?query, 'i') || REGEX(?sec_id, ?query, 'i') ) | |
}""", initBindings={'query': rdflib.Literal(trial_name)}) | |
def build_word_list(self, to_match): | |
list_of_words = [] | |
list_of_words.append(to_match) | |
# cleaned match | |
list_of_words.append(copy.copy(to_match).translate(string.maketrans("",""), string.punctuation)) | |
length = len(to_match) | |
for i in range(1, length - MIN_LENGTH): | |
list_of_words.append(re.escape(to_match[i:])) | |
list_of_words.append(re.escape(to_match[:length-i])) | |
return list_of_words | |
def match(self, to_match): | |
matches = [] | |
word_list = self.build_word_list(to_match) | |
for pattern in word_list: | |
results = [x for x in self.search_trial_name(pattern)] | |
if len(results) == 1: | |
# single match | |
return results[0] | |
for match in results: | |
if not match in matches: | |
matches.append(match) | |
else: | |
return matches | |
def impute_best_option(self, source, matches): | |
""" | |
Use the levenshtein module to try and disambiguate | |
""" | |
# first, try the URL vs the sponsor name | |
scored = {} | |
for match in matches: | |
sponsor = match.agency or match.source | |
url = jaro_winkler(unicode(source['url']), unicode(sponsor)) | |
titles = jaro_winkler(unicode(source['description']), unicode(match.title)) | |
c = source.copy() | |
c['trial_id'] = match.trial_id | |
c['enrollment'] = match.enrollment | |
c['linked_url'] = match.trial | |
c['lead_sponsor'] = match.lead | |
c['sponsor_name'] = match.agency or match.source | |
c['phase'] = match.phase | |
c['title'] = match.title | |
c['org_id'] = match.org_id | |
c['sec_id'] = match.sec_id | |
c["url-jw"] = url | |
c["desc-jw"] = titles | |
self.write('partresolved', c) | |
def process_file(self, filename): | |
# TODO: generate a graph | |
resolved = [] | |
partlyresolved = [] | |
unresolved = [] | |
proposed = [] | |
if os.path.exists(filename): | |
with open(filename, 'r') as csvfile: | |
dr = csv.DictReader(csvfile, delimiter='\t') | |
for line in dr: | |
try: | |
matched = self.match(line['project_name']) | |
if isinstance(matched, list): | |
print "Multiple matches for %s" % line['project_name'] | |
for match in matched: | |
self.impute_best_option(line, matched) | |
else: | |
c = line.copy() | |
c['trial_id'] = matched.trial_id | |
c['enrollment'] = matched.enrollment | |
c['linked_url'] = matched.trial | |
c['lead_sponsor'] = matched.lead | |
c['phase'] = matched.phase | |
c['sponsor_name'] = matched.agency or matched.source | |
c['title'] = matched.title | |
c['org_id'] = matched.org_id | |
c['sec_id'] = matched.sec_id | |
resolved.append(c) | |
self.write('resolved', c) | |
except MatchError: | |
print "Couldn't match %s" % line['project_name'] | |
self.write('unresolved', line) | |
unresolved.append(line) | |
self._doc.save("map.xlsx") | |
print "Loaded records" | |
print "Resolved: %s" % len(resolved) | |
print "Unable to resolve: %s" % len(unresolved) | |
print "Number to adjudicate: %s" % len(partlyresolved) | |
if __name__ == "__main__": | |
sf = StudyFuzzer() | |
sf.process_file(sys.argv[1]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment