Skip to content

Instantly share code, notes, and snippets.

@leebird
Created April 4, 2016 13:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leebird/ea85909dd3159047792c7561e4ebe6f6 to your computer and use it in GitHub Desktop.
Save leebird/ea85909dd3159047792c7561e4ebe6f6 to your computer and use it in GitHub Desktop.
Scoring miRTex results
# Note that this is not a standalone script. It has dependencies.
# Here it is just used as an example illustrating the scoring
# process for miRTex results.
from __future__ import unicode_literals, print_function
import pickle
import os
import codecs
import sys
import re
import logging
from protolib.python import document_pb2
from nlprpc import grpcapi
from nlp.helper import DocHelper, RangeHelper
high_confidence_heads = ['expressions?', 'levels?', 'factors?', 'genes?',
'proteins?', 'productions?',
'receptors?', 'translations?', 'mRNAs?',
'over-?expressions?', 'up-?regulations?',
'down-?regulations?', 'transcriptions?',
'repressions?',
'accumulations?',
'RNAs?', 'status', 'suppressions?', 'abundances?']
negative_heads = ['promoters?', 'cells?', 'proliferations?', 'secretions?',
'pathways?', 'phosphorylations?', 'resistances?',
'differentiations?', 'functions?', 'lines?', 'apoptosis',
'signallings?', 'signalings?']
low_confidence_gene_heads = ['activity', 'activities', 'activations?',
'inhibitors?', 'targets?']
high_confidence_triggers = {'target', 'bind', 'bound'}
negative_triggers = {'activat'}
high_confidence_cooccurence = {'3(.*?)UTR', '3(.*?)untranslated region', 'translation'}
high_confidence_modifier = {'(^| )direct', '(^| )directly', 'immediate', 'immediately'}
# Compile regular expressions.
high_confidence_heads = [re.compile(h, re.IGNORECASE) for h in
high_confidence_heads]
negative_heads = [re.compile(h, re.IGNORECASE) for h in negative_heads]
low_confidence_gene_heads = [re.compile(h, re.IGNORECASE) for h in
low_confidence_gene_heads]
high_confidence_cooccurence = [re.compile(h, re.IGNORECASE) for h in
high_confidence_cooccurence]
high_confidence_modifier = [re.compile(h, re.IGNORECASE) for h in
high_confidence_modifier]
tag_scores = {
'mirna_is_head': 100,
'mirna_highconf_head': 125,
'gene_is_head': 100,
'gene_highconf_head': 125,
'highconf_trigger': 300,
'highconf_cooccur': 50,
'highconf_modifier': 25,
'weak_direct': -50,
'null_arg_direct': -75,
}
tag_to_remove = {
'anti_mirna', 'invalid_gene', 'mirna_negative_head',
'gene_negative_head', 'negative_trigger'
}
def is_high_confidence_head(head):
bits = head.split('/')
for p in high_confidence_heads:
for h in bits:
if p.match(h):
return True
return False
def is_negative_head(head):
bits = head.split('/')
for p in negative_heads:
for h in bits:
if p.match(h):
return True
return False
def is_low_confidence_gene_head(head):
bits = head.split('/')
for p in low_confidence_gene_heads:
for h in bits:
if p.match(h):
return True
return False
def is_high_confidence_trigger(trigger):
for t in high_confidence_triggers:
if t.lower() in trigger.lower():
return True
return False
def is_negative_trigger(trigger):
for t in negative_triggers:
if t.lower() in trigger.lower():
return True
return False
def is_high_confidence_cooccurence(sentence_text):
for p in high_confidence_cooccurence:
if p.search(sentence_text) is not None:
return True
return False
def is_high_confidence_modifier(sentence_text):
for p in high_confidence_modifier:
if p.search(sentence_text) is not None:
return True
return False
def test_run():
interface = grpcapi.GrpcInterface()
request = document_pb2.Request()
doc = request.document.add()
doc.doc_id = 'test'
doc.text = 'miR-21 (targets) fas ligand-mediated apoptosis in breast cancer cell line mcf-7.'
request.request_type = document_pb2.Request.PARSE
print(request)
response = interface.process_document(request)
print(str(response.document[0]))
def test_brat_writer():
for root, _, files in os.walk('data/extracted'):
for f in files:
if f != '23340180.txt':
continue
txtfile = os.path.join(root, f)
annfile = os.path.join(root, f[:-4] + '.ann')
filename = os.path.basename(f)
doc_id = filename.split('.')[0]
doc = DocHelper.load_from_brat_file(doc_id, txtfile, annfile)
helper = DocHelper(doc)
helper.dump_to_brat_file('test/test.txt', 'test/test.ann')
break
def find_head(helper, entity, phrases, sentence):
entity_text = helper.text(entity)
min_phrase_length = len(helper.doc.text)
char_start, char_end = entity.char_start, entity.char_end
the_head = None
is_head = False
for phrase in phrases:
if phrase.label != 'NP':
continue
phrase_char_range = phrase.char_end - phrase.char_start + 1
if phrase_char_range > min_phrase_length:
continue
if RangeHelper.char_range_include(phrase, entity):
# The head phrase includes the entity.
min_phrase_length = phrase_char_range
the_head = phrase
if RangeHelper.overlap(
(phrase.head_char_start, phrase.head_char_end),
(char_start, char_end)):
# The head token overlaps with the entity.
is_head = True
# Handle conjuncted entities. If the current entity doesn't overlap with
# the head, but it is in conjunction with the head, and the head is of the
# same entity type, then we regard the current entity as head too.
# (NN CC NN=head). Could potentially be wrong, e.g., gene regulation and gene2,
# the first gene's phrase's head is regulation, but this process would assign
# gene2 as its head, thus is_head = True.
if helper.has_conjunction(sentence, phrase):
conjuncted_entities = helper.char_offset_to_entity(
phrase.head_char_start)
for ce in conjuncted_entities:
if ce.entity_type == entity.entity_type:
is_head = True
break
if the_head is None:
logging.warning('\t'.join(['Entity none head', helper.doc.doc_id, entity.duid]))
return False, '', '', None
# Handle preposition cases. E.g, expression of gene should tagged as gene_not_head
# and gene_highconf_head
# (NP (NP (NN=head)) (PP (IN of) (NP))
pp_parent = None
parent = phrases[the_head.parent]
while True:
# Go up to the root to find the PP dominating a list of NPs.
if parent.label == 'PP':
break
elif parent.parent == parent.index:
break
elif parent.label != 'NP':
break
else:
parent = phrases[parent.parent]
if parent.label == 'PP':
for child in parent.children:
if phrases[child].label == 'IN' and helper.text(phrases[child]).lower() == 'of':
pp_parent = phrases[parent.parent]
# is_head = False
break
head_word = helper.token_of_char_offset(the_head.head_char_start).word
head_phrase = helper.doc.text[the_head.char_start:the_head.char_end + 1]
#print(pp_parent)
# print(is_head, head_word, head_phrase, the_head, entity_text, sep='\t')
return is_head, head_word, head_phrase, pp_parent
def validate_gene(gene_text):
if gene_text.lower().find('mirna') != -1 or \
gene_text.lower().find('microrna') != -1 or \
gene_text.lower().find('luciferase') != -1 or \
gene_text.lower().find('-mir') != -1:
return False
return True
def validate_mirna(mirna_start, doc_text):
# Check anti-miRNA.
if mirna_start >= 5:
if doc_text[mirna_start - 5:mirna_start].lower() == 'anti-':
return False
return True
def load_mirtex_result(mirtex_dir):
request_size = 5
docs = []
masked = []
count = 0
for root, _, files in os.walk(mirtex_dir):
for f in files:
if not f.endswith('.txt'):
continue
count += 1
# print(count, end='\r')
txtfile = os.path.join(root, f)
annfile = os.path.join(root, f[:-4] + '.ann')
filename = os.path.basename(f)
doc_id = filename.split('.')[0]
doc = DocHelper.load_from_brat_file(doc_id, txtfile, annfile)
docs.append(doc)
try:
masked.append(DocHelper(doc).mask_entity(
exclude_type=set([document_pb2.Entity.TRIGGER])))
except ValueError:
masked.append(doc)
if len(docs) == request_size:
nlp_request = document_pb2.Request()
nlp_request.request_type = document_pb2.Request.SPLIT
nlp_request.document.extend(docs)
masked_request = document_pb2.Request()
masked_request.request_type = document_pb2.Request.PARSE
masked_request.document.extend(masked)
yield nlp_request, masked_request
docs = []
masked = []
# break
# Only split original documents.
nlp_request = document_pb2.Request()
nlp_request.request_type = document_pb2.Request.SPLIT
nlp_request.document.extend(docs)
# Parse the masked documents.
masked_request = document_pb2.Request()
masked_request.request_type = document_pb2.Request.PARSE
masked_request.document.extend(masked)
yield nlp_request, masked_request
def post_process(mirtex_dir):
count = 0
invalid_gene_count = 0
anti_mir_count = 0
result_file = codecs.open('mirtex_result.txt', 'w', encoding='utf8')
request_iterator = load_mirtex_result(mirtex_dir)
queue = grpcapi.get_queue_masked('128.4.20.169', 15, request_iterator)
for original_response, response in queue:
if not response.success:
logging.warning('Request failed' + '\t' + ','.join([d.doc_id for d in response.document]))
continue
for original_doc, doc in zip(original_response.document, response.document):
helper = DocHelper(doc)
valid_relation_lines = {}
invalid_relation_ids = set()
for entity_id, entity in doc.entity.items():
helper.fill_entity_using_char_offset(entity)
for relation_id, relation in doc.relation.items():
tags = set()
score = 0
direct = 'unknown'
relation_type = 'MiRNA2Gene'
for attr in relation.attribute:
if attr.key == 'nullarg' and attr.value == '1':
tags.add('null_arg_direct')
if attr.key == 'rel_type' and attr.value == 'G2M':
relation_type = 'Gene2MiRNA'
if attr.key == 'direct':
if attr.value == 'direct':
direct = 'direct'
elif attr.value == 'weak_direct':
direct = 'direct'
tags.add('weak_direct')
sentence_index = set()
mark_gnh = ''
mark_mnh = ''
mark_relax = ''
gene_head = ''
mir_head = ''
gene_text = ''
mir_text = ''
gene_phrase = ''
mir_phrase = ''
gene_pp_phrase = None
mir_pp_phrase = None
valid_gene = False
trigger = None
trigger_text = ''
for arg in relation.argument:
entity = doc.entity[arg.entity_duid]
sentence_index.add(entity.sentence_index)
sentence = doc.sentence[entity.sentence_index]
if arg.role == 'Trigger':
trigger_text = helper.text(entity)
trigger = entity
if is_high_confidence_trigger(trigger_text):
tags.add('highconf_trigger')
if is_negative_trigger(trigger_text):
tags.add('negative_trigger')
if (arg.role == 'Theme' and relation_type == 'MiRNA2Gene') or \
(arg.role == 'Agent' and relation_type == 'Gene2MiRNA') or \
arg.role == 'Arg2':
gene_text = helper.text(entity)
valid_gene = validate_gene(gene_text)
is_head, gene_head, gene_phrase, gene_pp_phrase = find_head(helper,
entity,
sentence.constituent,
sentence)
if not is_head:
mark_gnh = 'GNH'
tags.add('gene_not_head')
if is_high_confidence_head(gene_head):
tags.add('gene_highconf_head')
if is_negative_head(gene_head):
tags.add('gene_negative_head')
if is_low_confidence_gene_head(gene_head):
tags.add('gene_negative_head')
else:
tags.add('gene_is_head')
if (arg.role == 'Theme' and relation_type == 'Gene2MiRNA') or \
(arg.role == 'Agent' and relation_type == 'MiRNA2Gene') or \
arg.role == 'Arg1':
mir_text = helper.text(entity)
if not validate_mirna(entity.char_start, doc.text):
tags.add('anti_mirna')
anti_mir_count += 1
is_head, mir_head, mir_phrase, mir_pp_phrase = find_head(helper,
entity,
sentence.constituent,
sentence)
if not is_head:
tags.add('mirna_not_head')
if is_high_confidence_head(mir_head):
tags.add('mirna_highconf_head')
if is_negative_head(mir_head):
tags.add('mirna_negative_head')
mark_mnh = 'MNH'
else:
tags.add('mirna_is_head')
if gene_pp_phrase is not None:
if trigger is not None and \
RangeHelper.char_range_overlap(gene_pp_phrase, trigger):
# We only consider pp head when trigger is not in the pp
# phrase. If the trigger is in the pp phrase, e.g.,
# targeting of gene, the head would be targeting, and it is
# a low confidence head for the gene.
pass
else:
pp_head = helper.doc.text[gene_pp_phrase.head_char_start:
gene_pp_phrase.head_char_end + 1]
if is_high_confidence_head(pp_head):
# We regard the pp head as the real head of the argument.
if 'gene_is_head' in tags:
tags.remove('gene_is_head')
tags.add('gene_highconf_head')
if is_negative_head(pp_head):
tags.add('gene_negative_head')
if is_low_confidence_gene_head(pp_head):
tags.add('gene_negative_head')
if trigger_text == '':
tags.add('relaxed_rule')
mark_relax = 'RelaxedRule'
if not valid_gene:
tags.add('invalid_gene')
invalid_gene_count += 1
# continue
sentence_text = []
for i in sorted(sentence_index):
sentence = doc.sentence[i]
sentence_text.append(helper.text(sentence))
sentence_text = ' '.join(sentence_text)
if is_high_confidence_cooccurence(sentence_text):
tags.add('highconf_cooccur')
if is_high_confidence_modifier(sentence_text):
tags.add('highconf_modifier')
line = '\t'.join([doc.doc_id, relation_type,
direct, mark_relax, trigger_text,
mir_text, mark_mnh, mir_head, mir_phrase,
gene_text, mark_gnh, gene_head, gene_phrase,
sentence_text])
# print(line)
if 'highconf_trigger' in tags and 'null_arg_direct' in tags:
# If we have high confidence trigger, then we don't punish
# even it's extracted by null-argument rule.
tags.remove('null_arg_direct')
for tag in tags:
attr = original_doc.relation[relation.duid].attribute.add()
attr.key = 'tag'
attr.value = tag
if tag in tag_scores:
score += tag_scores[tag]
# If scores are the same, prefer shorter sentence.
score += 100.0 / len(sentence_text)
attr = original_doc.relation[relation.duid].attribute.add()
attr.key = 'score'
attr.value = str(score)
if len(tag_to_remove & tags) > 0:
logging.warning('Skip invalid relation' + '\t' +
doc.doc_id + '\t' + relation.duid + '\t' +
'|'.join(tag_to_remove & tags))
# del doc.relation[relation.duid]
# Note that we can't delete a map entry while looping in map.
# Instead record all ids to be removed, and remove them after
# the loop.
invalid_relation_ids.add(relation.duid)
continue
valid_relation_lines[relation.duid] = line
for vid, line in valid_relation_lines.items():
result_file.write(line + '\n')
# Delete invalid relations here.
for ivid in invalid_relation_ids:
del original_doc.relation[ivid]
original_helper = DocHelper(original_doc)
original_helper.dump_to_brat_file('test2/' + original_doc.doc_id + '.txt',
'test2/' + original_doc.doc_id + '.ann',
True)
count += 1
# if count == 1:
# break
print(count, end='\r')
result_file.close()
print(count, invalid_gene_count, anti_mir_count)
'''
def fix_mirna(mirtex_dir):
count = 0
invalid_gene_count = 0
anti_mir_count = 0
request_iterator = load_mirtex_result(mirtex_dir)
for response in request_iterator:
for doc in response.document:
helper = DocHelper(doc)
for entity_id, entity in doc.entity.items():
helper.fill_entity_using_char_offset(entity)
for relation_id, relation in doc.relation.items():
relation_type = 'MiRNA2Gene'
for attr in relation.attribute:
if attr.key == 'direction' and attr.value == 'G2M':
relation_type = 'Gene2MiRNA'
tags = set()
for attr in relation.attribute:
if attr.key == 'tag':
tags.add(attr.value)
for arg in relation.argument:
entity = doc.entity[arg.entity_duid]
if (
arg.role == 'Theme' and relation_type == 'Gene2MiRNA') or \
(
arg.role == 'Agent' and relation_type == 'MiRNA2Gene') or \
arg.role == 'Arg1':
if not validate_mirna(entity.char_start, doc.text):
tags.add('anti_mirna')
anti_mir_count += 1
if len(tag_to_remove & tags) > 0:
print('Skip invalid relation:', doc.doc_id, relation.duid,
'|'.join(tag_to_remove & tags), file=sys.stderr)
del doc.relation[relation.duid]
continue
helper.dump_to_brat_file('test2/' + doc.doc_id + '.txt',
'test2/' + doc.doc_id + '.ann',
True)
count += 1
print(count, end='\r')
print(count, invalid_gene_count, anti_mir_count)
'''
if __name__ == '__main__':
post_process('data/abstracts_results')
pass
# test_run()
# test_brat_writer()
# fix_mirna('post_processed_mirtex')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment