build_ES_pubtator_index: Building ElasticSearch Index of PubTator
#!/usr/bin/env python3 | |
""" | |
Parse PubTator raw data file, and creates ElasticSearch index, and populate the | |
corpus (index: 'pubtator') | |
You can download PubTator dataset from the following link: | |
ftp://ftp.ncbi.nlm.nih.gov/pub/lu/PubTator/bioconcepts2pubtator_offsets.gz | |
Place the file in 'data/' and run this code. Don't need to decompress the file. | |
We assume that ElasticSearch is properly installed and accessible via its API | |
channel. We use python elasticsearch client library. | |
""" | |
import sys | |
import re | |
import logging | |
import gzip | |
from pathlib import Path | |
import coloredlogs | |
from elasticsearch import Elasticsearch as ES | |
from elasticsearch.helpers import bulk | |
logger = logging.getLogger(__name__) | |
conf = { | |
'es_index': 'pubtator', | |
'es_host': 'localhost', | |
'es_port': 9200, | |
'batch_size': 10000, | |
} | |
def create_action(actions, doc): | |
"""Single ES API request to index a document""" | |
act = { | |
'_op_type': 'index', | |
'_index': conf['es_index'], | |
'_type': '_doc', | |
'_id': None, | |
'_source': { | |
'_title': None, | |
'_body': None, | |
'_annotations': None | |
} | |
} | |
p_title = re.compile(r'^((\d+)\|[t]\|)(.*)') | |
p_body = re.compile(r'^((\d+)\|[a]\|)(.*)') | |
annotations = [] | |
for line in doc: | |
m = p_title.match(line) | |
if m: # Title | |
act['_id'] = m.group(2) | |
act['_source']['_title'] = m.group(3) | |
continue | |
m = p_body.match(line) | |
if m: # Body | |
act['_source']['_body'] = m.group(3) | |
continue | |
# Annotation lines | |
annotations.append(line) | |
act['_source']['_annotations'] = '\n'.join(annotations) | |
actions.append(act) | |
def run_indexing(es, actions): | |
try: | |
resp = bulk(es, actions) | |
except Exception as ex: | |
logger.error('Error in run_indexing: ' + str(ex)) | |
else: | |
return resp[0] | |
def main(): | |
# Logger | |
coloredlogs.install(level='INFO', logger=logger) | |
# Instantiate ES client and check connection status | |
es = ES([{'host': conf['es_host'], 'port': conf['es_port']}]) | |
if es.ping(): | |
logger.info("ES Client connected") | |
logger.debug(es.info()) | |
else: | |
logger.error("Couldn't connect to the ES server. Terminating...") | |
sys.exit() | |
# Create an index | |
index_name = conf['es_index'] | |
index_settings = { | |
'settings': { | |
'number_of_shards': 1, | |
'number_of_replicas': 0 | |
}, | |
'mappings': { # https://is.gd/wVaD8I | |
'properties': { | |
'title': {'type': 'text'}, | |
'abstract': {'type': 'text'}, | |
'annotations': {'type': 'text'}, | |
} | |
} | |
} | |
try: | |
if not es.indices.exists(index_name): | |
es.indices.create(index=index_name, body=index_settings) | |
logger.info('ES index {} created'.format(index_name)) | |
else: | |
logger.info('ES index {} already exists. ' | |
'Skip creating an index...'.format(index_name)) | |
except Exception as ex: | |
logger.error('failed to create an ES index: {}'.format(str(ex))) | |
# Read pubtator datafile | |
file_pubtator = Path(__file__).resolve().parents[1] / \ | |
'data/bioconcepts2pubtator_offsets.gz' | |
with gzip.open(file_pubtator, 'rt') as f: | |
doc = [] | |
counter = [0, 0, 0] # [num_attempts, docs_indexed, batch_counter] | |
actions = [] | |
for line in f: | |
line = line.rstrip() | |
if len(line) > 0: | |
doc.append(line) | |
else: | |
create_action(actions, doc) | |
doc = [] | |
counter[2] += 1 | |
if counter[2] >= conf['batch_size']: | |
counter[0] += len(actions) | |
succ = run_indexing(es, actions) | |
succ = len(actions) # debug | |
counter[1] += succ | |
print('docs read: {}, docs indexed: {}, batch size: {}' | |
''.format(*counter), end='\r', flush=True) | |
counter[2] = 0 | |
actions = [] | |
if actions: # Run the rest of actions | |
counter[0] += len(actions) | |
succ = run_indexing(es, actions) | |
counter[1] += succ | |
# Print out the statistics | |
logger.info(f"Number of docs read: {counter[0]}") | |
logger.info(f"Number of docs indexed: {counter[1]}") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment