Created
April 4, 2016 13:42
-
-
Save leebird/ea85909dd3159047792c7561e4ebe6f6 to your computer and use it in GitHub Desktop.
Scoring miRTex results
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Note that this is not a standalone script. It has dependencies. | |
# Here it is just used as an example illustrating the scoring | |
# process for miRTex results. | |
from __future__ import unicode_literals, print_function | |
import pickle | |
import os | |
import codecs | |
import sys | |
import re | |
import logging | |
from protolib.python import document_pb2 | |
from nlprpc import grpcapi | |
from nlp.helper import DocHelper, RangeHelper | |
high_confidence_heads = ['expressions?', 'levels?', 'factors?', 'genes?', | |
'proteins?', 'productions?', | |
'receptors?', 'translations?', 'mRNAs?', | |
'over-?expressions?', 'up-?regulations?', | |
'down-?regulations?', 'transcriptions?', | |
'repressions?', | |
'accumulations?', | |
'RNAs?', 'status', 'suppressions?', 'abundances?'] | |
negative_heads = ['promoters?', 'cells?', 'proliferations?', 'secretions?', | |
'pathways?', 'phosphorylations?', 'resistances?', | |
'differentiations?', 'functions?', 'lines?', 'apoptosis', | |
'signallings?', 'signalings?'] | |
low_confidence_gene_heads = ['activity', 'activities', 'activations?', | |
'inhibitors?', 'targets?'] | |
high_confidence_triggers = {'target', 'bind', 'bound'} | |
negative_triggers = {'activat'} | |
high_confidence_cooccurence = {'3(.*?)UTR', '3(.*?)untranslated region', 'translation'} | |
high_confidence_modifier = {'(^| )direct', '(^| )directly', 'immediate', 'immediately'} | |
# Compile regular expressions. | |
high_confidence_heads = [re.compile(h, re.IGNORECASE) for h in | |
high_confidence_heads] | |
negative_heads = [re.compile(h, re.IGNORECASE) for h in negative_heads] | |
low_confidence_gene_heads = [re.compile(h, re.IGNORECASE) for h in | |
low_confidence_gene_heads] | |
high_confidence_cooccurence = [re.compile(h, re.IGNORECASE) for h in | |
high_confidence_cooccurence] | |
high_confidence_modifier = [re.compile(h, re.IGNORECASE) for h in | |
high_confidence_modifier] | |
tag_scores = { | |
'mirna_is_head': 100, | |
'mirna_highconf_head': 125, | |
'gene_is_head': 100, | |
'gene_highconf_head': 125, | |
'highconf_trigger': 300, | |
'highconf_cooccur': 50, | |
'highconf_modifier': 25, | |
'weak_direct': -50, | |
'null_arg_direct': -75, | |
} | |
tag_to_remove = { | |
'anti_mirna', 'invalid_gene', 'mirna_negative_head', | |
'gene_negative_head', 'negative_trigger' | |
} | |
def is_high_confidence_head(head): | |
bits = head.split('/') | |
for p in high_confidence_heads: | |
for h in bits: | |
if p.match(h): | |
return True | |
return False | |
def is_negative_head(head): | |
bits = head.split('/') | |
for p in negative_heads: | |
for h in bits: | |
if p.match(h): | |
return True | |
return False | |
def is_low_confidence_gene_head(head): | |
bits = head.split('/') | |
for p in low_confidence_gene_heads: | |
for h in bits: | |
if p.match(h): | |
return True | |
return False | |
def is_high_confidence_trigger(trigger): | |
for t in high_confidence_triggers: | |
if t.lower() in trigger.lower(): | |
return True | |
return False | |
def is_negative_trigger(trigger): | |
for t in negative_triggers: | |
if t.lower() in trigger.lower(): | |
return True | |
return False | |
def is_high_confidence_cooccurence(sentence_text): | |
for p in high_confidence_cooccurence: | |
if p.search(sentence_text) is not None: | |
return True | |
return False | |
def is_high_confidence_modifier(sentence_text): | |
for p in high_confidence_modifier: | |
if p.search(sentence_text) is not None: | |
return True | |
return False | |
def test_run(): | |
interface = grpcapi.GrpcInterface() | |
request = document_pb2.Request() | |
doc = request.document.add() | |
doc.doc_id = 'test' | |
doc.text = 'miR-21 (targets) fas ligand-mediated apoptosis in breast cancer cell line mcf-7.' | |
request.request_type = document_pb2.Request.PARSE | |
print(request) | |
response = interface.process_document(request) | |
print(str(response.document[0])) | |
def test_brat_writer(): | |
for root, _, files in os.walk('data/extracted'): | |
for f in files: | |
if f != '23340180.txt': | |
continue | |
txtfile = os.path.join(root, f) | |
annfile = os.path.join(root, f[:-4] + '.ann') | |
filename = os.path.basename(f) | |
doc_id = filename.split('.')[0] | |
doc = DocHelper.load_from_brat_file(doc_id, txtfile, annfile) | |
helper = DocHelper(doc) | |
helper.dump_to_brat_file('test/test.txt', 'test/test.ann') | |
break | |
def find_head(helper, entity, phrases, sentence): | |
entity_text = helper.text(entity) | |
min_phrase_length = len(helper.doc.text) | |
char_start, char_end = entity.char_start, entity.char_end | |
the_head = None | |
is_head = False | |
for phrase in phrases: | |
if phrase.label != 'NP': | |
continue | |
phrase_char_range = phrase.char_end - phrase.char_start + 1 | |
if phrase_char_range > min_phrase_length: | |
continue | |
if RangeHelper.char_range_include(phrase, entity): | |
# The head phrase includes the entity. | |
min_phrase_length = phrase_char_range | |
the_head = phrase | |
if RangeHelper.overlap( | |
(phrase.head_char_start, phrase.head_char_end), | |
(char_start, char_end)): | |
# The head token overlaps with the entity. | |
is_head = True | |
# Handle conjuncted entities. If the current entity doesn't overlap with | |
# the head, but it is in conjunction with the head, and the head is of the | |
# same entity type, then we regard the current entity as head too. | |
# (NN CC NN=head). Could potentially be wrong, e.g., gene regulation and gene2, | |
# the first gene's phrase's head is regulation, but this process would assign | |
# gene2 as its head, thus is_head = True. | |
if helper.has_conjunction(sentence, phrase): | |
conjuncted_entities = helper.char_offset_to_entity( | |
phrase.head_char_start) | |
for ce in conjuncted_entities: | |
if ce.entity_type == entity.entity_type: | |
is_head = True | |
break | |
if the_head is None: | |
logging.warning('\t'.join(['Entity none head', helper.doc.doc_id, entity.duid])) | |
return False, '', '', None | |
# Handle preposition cases. E.g, expression of gene should tagged as gene_not_head | |
# and gene_highconf_head | |
# (NP (NP (NN=head)) (PP (IN of) (NP)) | |
pp_parent = None | |
parent = phrases[the_head.parent] | |
while True: | |
# Go up to the root to find the PP dominating a list of NPs. | |
if parent.label == 'PP': | |
break | |
elif parent.parent == parent.index: | |
break | |
elif parent.label != 'NP': | |
break | |
else: | |
parent = phrases[parent.parent] | |
if parent.label == 'PP': | |
for child in parent.children: | |
if phrases[child].label == 'IN' and helper.text(phrases[child]).lower() == 'of': | |
pp_parent = phrases[parent.parent] | |
# is_head = False | |
break | |
head_word = helper.token_of_char_offset(the_head.head_char_start).word | |
head_phrase = helper.doc.text[the_head.char_start:the_head.char_end + 1] | |
#print(pp_parent) | |
# print(is_head, head_word, head_phrase, the_head, entity_text, sep='\t') | |
return is_head, head_word, head_phrase, pp_parent | |
def validate_gene(gene_text): | |
if gene_text.lower().find('mirna') != -1 or \ | |
gene_text.lower().find('microrna') != -1 or \ | |
gene_text.lower().find('luciferase') != -1 or \ | |
gene_text.lower().find('-mir') != -1: | |
return False | |
return True | |
def validate_mirna(mirna_start, doc_text): | |
# Check anti-miRNA. | |
if mirna_start >= 5: | |
if doc_text[mirna_start - 5:mirna_start].lower() == 'anti-': | |
return False | |
return True | |
def load_mirtex_result(mirtex_dir): | |
request_size = 5 | |
docs = [] | |
masked = [] | |
count = 0 | |
for root, _, files in os.walk(mirtex_dir): | |
for f in files: | |
if not f.endswith('.txt'): | |
continue | |
count += 1 | |
# print(count, end='\r') | |
txtfile = os.path.join(root, f) | |
annfile = os.path.join(root, f[:-4] + '.ann') | |
filename = os.path.basename(f) | |
doc_id = filename.split('.')[0] | |
doc = DocHelper.load_from_brat_file(doc_id, txtfile, annfile) | |
docs.append(doc) | |
try: | |
masked.append(DocHelper(doc).mask_entity( | |
exclude_type=set([document_pb2.Entity.TRIGGER]))) | |
except ValueError: | |
masked.append(doc) | |
if len(docs) == request_size: | |
nlp_request = document_pb2.Request() | |
nlp_request.request_type = document_pb2.Request.SPLIT | |
nlp_request.document.extend(docs) | |
masked_request = document_pb2.Request() | |
masked_request.request_type = document_pb2.Request.PARSE | |
masked_request.document.extend(masked) | |
yield nlp_request, masked_request | |
docs = [] | |
masked = [] | |
# break | |
# Only split original documents. | |
nlp_request = document_pb2.Request() | |
nlp_request.request_type = document_pb2.Request.SPLIT | |
nlp_request.document.extend(docs) | |
# Parse the masked documents. | |
masked_request = document_pb2.Request() | |
masked_request.request_type = document_pb2.Request.PARSE | |
masked_request.document.extend(masked) | |
yield nlp_request, masked_request | |
def post_process(mirtex_dir): | |
count = 0 | |
invalid_gene_count = 0 | |
anti_mir_count = 0 | |
result_file = codecs.open('mirtex_result.txt', 'w', encoding='utf8') | |
request_iterator = load_mirtex_result(mirtex_dir) | |
queue = grpcapi.get_queue_masked('128.4.20.169', 15, request_iterator) | |
for original_response, response in queue: | |
if not response.success: | |
logging.warning('Request failed' + '\t' + ','.join([d.doc_id for d in response.document])) | |
continue | |
for original_doc, doc in zip(original_response.document, response.document): | |
helper = DocHelper(doc) | |
valid_relation_lines = {} | |
invalid_relation_ids = set() | |
for entity_id, entity in doc.entity.items(): | |
helper.fill_entity_using_char_offset(entity) | |
for relation_id, relation in doc.relation.items(): | |
tags = set() | |
score = 0 | |
direct = 'unknown' | |
relation_type = 'MiRNA2Gene' | |
for attr in relation.attribute: | |
if attr.key == 'nullarg' and attr.value == '1': | |
tags.add('null_arg_direct') | |
if attr.key == 'rel_type' and attr.value == 'G2M': | |
relation_type = 'Gene2MiRNA' | |
if attr.key == 'direct': | |
if attr.value == 'direct': | |
direct = 'direct' | |
elif attr.value == 'weak_direct': | |
direct = 'direct' | |
tags.add('weak_direct') | |
sentence_index = set() | |
mark_gnh = '' | |
mark_mnh = '' | |
mark_relax = '' | |
gene_head = '' | |
mir_head = '' | |
gene_text = '' | |
mir_text = '' | |
gene_phrase = '' | |
mir_phrase = '' | |
gene_pp_phrase = None | |
mir_pp_phrase = None | |
valid_gene = False | |
trigger = None | |
trigger_text = '' | |
for arg in relation.argument: | |
entity = doc.entity[arg.entity_duid] | |
sentence_index.add(entity.sentence_index) | |
sentence = doc.sentence[entity.sentence_index] | |
if arg.role == 'Trigger': | |
trigger_text = helper.text(entity) | |
trigger = entity | |
if is_high_confidence_trigger(trigger_text): | |
tags.add('highconf_trigger') | |
if is_negative_trigger(trigger_text): | |
tags.add('negative_trigger') | |
if (arg.role == 'Theme' and relation_type == 'MiRNA2Gene') or \ | |
(arg.role == 'Agent' and relation_type == 'Gene2MiRNA') or \ | |
arg.role == 'Arg2': | |
gene_text = helper.text(entity) | |
valid_gene = validate_gene(gene_text) | |
is_head, gene_head, gene_phrase, gene_pp_phrase = find_head(helper, | |
entity, | |
sentence.constituent, | |
sentence) | |
if not is_head: | |
mark_gnh = 'GNH' | |
tags.add('gene_not_head') | |
if is_high_confidence_head(gene_head): | |
tags.add('gene_highconf_head') | |
if is_negative_head(gene_head): | |
tags.add('gene_negative_head') | |
if is_low_confidence_gene_head(gene_head): | |
tags.add('gene_negative_head') | |
else: | |
tags.add('gene_is_head') | |
if (arg.role == 'Theme' and relation_type == 'Gene2MiRNA') or \ | |
(arg.role == 'Agent' and relation_type == 'MiRNA2Gene') or \ | |
arg.role == 'Arg1': | |
mir_text = helper.text(entity) | |
if not validate_mirna(entity.char_start, doc.text): | |
tags.add('anti_mirna') | |
anti_mir_count += 1 | |
is_head, mir_head, mir_phrase, mir_pp_phrase = find_head(helper, | |
entity, | |
sentence.constituent, | |
sentence) | |
if not is_head: | |
tags.add('mirna_not_head') | |
if is_high_confidence_head(mir_head): | |
tags.add('mirna_highconf_head') | |
if is_negative_head(mir_head): | |
tags.add('mirna_negative_head') | |
mark_mnh = 'MNH' | |
else: | |
tags.add('mirna_is_head') | |
if gene_pp_phrase is not None: | |
if trigger is not None and \ | |
RangeHelper.char_range_overlap(gene_pp_phrase, trigger): | |
# We only consider pp head when trigger is not in the pp | |
# phrase. If the trigger is in the pp phrase, e.g., | |
# targeting of gene, the head would be targeting, and it is | |
# a low confidence head for the gene. | |
pass | |
else: | |
pp_head = helper.doc.text[gene_pp_phrase.head_char_start: | |
gene_pp_phrase.head_char_end + 1] | |
if is_high_confidence_head(pp_head): | |
# We regard the pp head as the real head of the argument. | |
if 'gene_is_head' in tags: | |
tags.remove('gene_is_head') | |
tags.add('gene_highconf_head') | |
if is_negative_head(pp_head): | |
tags.add('gene_negative_head') | |
if is_low_confidence_gene_head(pp_head): | |
tags.add('gene_negative_head') | |
if trigger_text == '': | |
tags.add('relaxed_rule') | |
mark_relax = 'RelaxedRule' | |
if not valid_gene: | |
tags.add('invalid_gene') | |
invalid_gene_count += 1 | |
# continue | |
sentence_text = [] | |
for i in sorted(sentence_index): | |
sentence = doc.sentence[i] | |
sentence_text.append(helper.text(sentence)) | |
sentence_text = ' '.join(sentence_text) | |
if is_high_confidence_cooccurence(sentence_text): | |
tags.add('highconf_cooccur') | |
if is_high_confidence_modifier(sentence_text): | |
tags.add('highconf_modifier') | |
line = '\t'.join([doc.doc_id, relation_type, | |
direct, mark_relax, trigger_text, | |
mir_text, mark_mnh, mir_head, mir_phrase, | |
gene_text, mark_gnh, gene_head, gene_phrase, | |
sentence_text]) | |
# print(line) | |
if 'highconf_trigger' in tags and 'null_arg_direct' in tags: | |
# If we have high confidence trigger, then we don't punish | |
# even it's extracted by null-argument rule. | |
tags.remove('null_arg_direct') | |
for tag in tags: | |
attr = original_doc.relation[relation.duid].attribute.add() | |
attr.key = 'tag' | |
attr.value = tag | |
if tag in tag_scores: | |
score += tag_scores[tag] | |
# If scores are the same, prefer shorter sentence. | |
score += 100.0 / len(sentence_text) | |
attr = original_doc.relation[relation.duid].attribute.add() | |
attr.key = 'score' | |
attr.value = str(score) | |
if len(tag_to_remove & tags) > 0: | |
logging.warning('Skip invalid relation' + '\t' + | |
doc.doc_id + '\t' + relation.duid + '\t' + | |
'|'.join(tag_to_remove & tags)) | |
# del doc.relation[relation.duid] | |
# Note that we can't delete a map entry while looping in map. | |
# Instead record all ids to be removed, and remove them after | |
# the loop. | |
invalid_relation_ids.add(relation.duid) | |
continue | |
valid_relation_lines[relation.duid] = line | |
for vid, line in valid_relation_lines.items(): | |
result_file.write(line + '\n') | |
# Delete invalid relations here. | |
for ivid in invalid_relation_ids: | |
del original_doc.relation[ivid] | |
original_helper = DocHelper(original_doc) | |
original_helper.dump_to_brat_file('test2/' + original_doc.doc_id + '.txt', | |
'test2/' + original_doc.doc_id + '.ann', | |
True) | |
count += 1 | |
# if count == 1: | |
# break | |
print(count, end='\r') | |
result_file.close() | |
print(count, invalid_gene_count, anti_mir_count) | |
''' | |
def fix_mirna(mirtex_dir): | |
count = 0 | |
invalid_gene_count = 0 | |
anti_mir_count = 0 | |
request_iterator = load_mirtex_result(mirtex_dir) | |
for response in request_iterator: | |
for doc in response.document: | |
helper = DocHelper(doc) | |
for entity_id, entity in doc.entity.items(): | |
helper.fill_entity_using_char_offset(entity) | |
for relation_id, relation in doc.relation.items(): | |
relation_type = 'MiRNA2Gene' | |
for attr in relation.attribute: | |
if attr.key == 'direction' and attr.value == 'G2M': | |
relation_type = 'Gene2MiRNA' | |
tags = set() | |
for attr in relation.attribute: | |
if attr.key == 'tag': | |
tags.add(attr.value) | |
for arg in relation.argument: | |
entity = doc.entity[arg.entity_duid] | |
if ( | |
arg.role == 'Theme' and relation_type == 'Gene2MiRNA') or \ | |
( | |
arg.role == 'Agent' and relation_type == 'MiRNA2Gene') or \ | |
arg.role == 'Arg1': | |
if not validate_mirna(entity.char_start, doc.text): | |
tags.add('anti_mirna') | |
anti_mir_count += 1 | |
if len(tag_to_remove & tags) > 0: | |
print('Skip invalid relation:', doc.doc_id, relation.duid, | |
'|'.join(tag_to_remove & tags), file=sys.stderr) | |
del doc.relation[relation.duid] | |
continue | |
helper.dump_to_brat_file('test2/' + doc.doc_id + '.txt', | |
'test2/' + doc.doc_id + '.ann', | |
True) | |
count += 1 | |
print(count, end='\r') | |
print(count, invalid_gene_count, anti_mir_count) | |
''' | |
if __name__ == '__main__': | |
post_process('data/abstracts_results') | |
pass | |
# test_run() | |
# test_brat_writer() | |
# fix_mirna('post_processed_mirtex') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment