Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
build_ES_pubtator_index: Building ElasticSearch Index of PubTator
#!/usr/bin/env python3
"""
Parse PubTator raw data file, and creates ElasticSearch index, and populate the
corpus (index: 'pubtator')
You can download PubTator dataset from the following link:
ftp://ftp.ncbi.nlm.nih.gov/pub/lu/PubTator/bioconcepts2pubtator_offsets.gz
Place the file in 'data/' and run this code. Don't need to decompress the file.
We assume that ElasticSearch is properly installed and accessible via its API
channel. We use python elasticsearch client library.
"""
import sys
import re
import logging
import gzip
from pathlib import Path
import coloredlogs
from elasticsearch import Elasticsearch as ES
from elasticsearch.helpers import bulk
logger = logging.getLogger(__name__)
conf = {
'es_index': 'pubtator',
'es_host': 'localhost',
'es_port': 9200,
'batch_size': 10000,
}
def create_action(actions, doc):
"""Single ES API request to index a document"""
act = {
'_op_type': 'index',
'_index': conf['es_index'],
'_type': '_doc',
'_id': None,
'_source': {
'_title': None,
'_body': None,
'_annotations': None
}
}
p_title = re.compile(r'^((\d+)\|[t]\|)(.*)')
p_body = re.compile(r'^((\d+)\|[a]\|)(.*)')
annotations = []
for line in doc:
m = p_title.match(line)
if m: # Title
act['_id'] = m.group(2)
act['_source']['_title'] = m.group(3)
continue
m = p_body.match(line)
if m: # Body
act['_source']['_body'] = m.group(3)
continue
# Annotation lines
annotations.append(line)
act['_source']['_annotations'] = '\n'.join(annotations)
actions.append(act)
def run_indexing(es, actions):
try:
resp = bulk(es, actions)
except Exception as ex:
logger.error('Error in run_indexing: ' + str(ex))
else:
return resp[0]
def main():
# Logger
coloredlogs.install(level='INFO', logger=logger)
# Instantiate ES client and check connection status
es = ES([{'host': conf['es_host'], 'port': conf['es_port']}])
if es.ping():
logger.info("ES Client connected")
logger.debug(es.info())
else:
logger.error("Couldn't connect to the ES server. Terminating...")
sys.exit()
# Create an index
index_name = conf['es_index']
index_settings = {
'settings': {
'number_of_shards': 1,
'number_of_replicas': 0
},
'mappings': { # https://is.gd/wVaD8I
'properties': {
'title': {'type': 'text'},
'abstract': {'type': 'text'},
'annotations': {'type': 'text'},
}
}
}
try:
if not es.indices.exists(index_name):
es.indices.create(index=index_name, body=index_settings)
logger.info('ES index {} created'.format(index_name))
else:
logger.info('ES index {} already exists. '
'Skip creating an index...'.format(index_name))
except Exception as ex:
logger.error('failed to create an ES index: {}'.format(str(ex)))
# Read pubtator datafile
file_pubtator = Path(__file__).resolve().parents[1] / \
'data/bioconcepts2pubtator_offsets.gz'
with gzip.open(file_pubtator, 'rt') as f:
doc = []
counter = [0, 0, 0] # [num_attempts, docs_indexed, batch_counter]
actions = []
for line in f:
line = line.rstrip()
if len(line) > 0:
doc.append(line)
else:
create_action(actions, doc)
doc = []
counter[2] += 1
if counter[2] >= conf['batch_size']:
counter[0] += len(actions)
succ = run_indexing(es, actions)
succ = len(actions) # debug
counter[1] += succ
print('docs read: {}, docs indexed: {}, batch size: {}'
''.format(*counter), end='\r', flush=True)
counter[2] = 0
actions = []
if actions: # Run the rest of actions
counter[0] += len(actions)
succ = run_indexing(es, actions)
counter[1] += succ
# Print out the statistics
logger.info(f"Number of docs read: {counter[0]}")
logger.info(f"Number of docs indexed: {counter[1]}")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment