Skip to content

Instantly share code, notes, and snippets.

@romanegloo
Created September 5, 2019 19:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save romanegloo/60b428098fea8b86ac697b57964709b6 to your computer and use it in GitHub Desktop.
Save romanegloo/60b428098fea8b86ac697b57964709b6 to your computer and use it in GitHub Desktop.
build_ES_pubtator_index: Building ElasticSearch Index of PubTator
#!/usr/bin/env python3
"""
Parse PubTator raw data file, and creates ElasticSearch index, and populate the
corpus (index: 'pubtator')
You can download PubTator dataset from the following link:
ftp://ftp.ncbi.nlm.nih.gov/pub/lu/PubTator/bioconcepts2pubtator_offsets.gz
Place the file in 'data/' and run this code. Don't need to decompress the file.
We assume that ElasticSearch is properly installed and accessible via its API
channel. We use python elasticsearch client library.
"""
import sys
import re
import logging
import gzip
from pathlib import Path
import coloredlogs
from elasticsearch import Elasticsearch as ES
from elasticsearch.helpers import bulk
logger = logging.getLogger(__name__)
conf = {
'es_index': 'pubtator',
'es_host': 'localhost',
'es_port': 9200,
'batch_size': 10000,
}
def create_action(actions, doc):
"""Single ES API request to index a document"""
act = {
'_op_type': 'index',
'_index': conf['es_index'],
'_type': '_doc',
'_id': None,
'_source': {
'_title': None,
'_body': None,
'_annotations': None
}
}
p_title = re.compile(r'^((\d+)\|[t]\|)(.*)')
p_body = re.compile(r'^((\d+)\|[a]\|)(.*)')
annotations = []
for line in doc:
m = p_title.match(line)
if m: # Title
act['_id'] = m.group(2)
act['_source']['_title'] = m.group(3)
continue
m = p_body.match(line)
if m: # Body
act['_source']['_body'] = m.group(3)
continue
# Annotation lines
annotations.append(line)
act['_source']['_annotations'] = '\n'.join(annotations)
actions.append(act)
def run_indexing(es, actions):
try:
resp = bulk(es, actions)
except Exception as ex:
logger.error('Error in run_indexing: ' + str(ex))
else:
return resp[0]
def main():
# Logger
coloredlogs.install(level='INFO', logger=logger)
# Instantiate ES client and check connection status
es = ES([{'host': conf['es_host'], 'port': conf['es_port']}])
if es.ping():
logger.info("ES Client connected")
logger.debug(es.info())
else:
logger.error("Couldn't connect to the ES server. Terminating...")
sys.exit()
# Create an index
index_name = conf['es_index']
index_settings = {
'settings': {
'number_of_shards': 1,
'number_of_replicas': 0
},
'mappings': { # https://is.gd/wVaD8I
'properties': {
'title': {'type': 'text'},
'abstract': {'type': 'text'},
'annotations': {'type': 'text'},
}
}
}
try:
if not es.indices.exists(index_name):
es.indices.create(index=index_name, body=index_settings)
logger.info('ES index {} created'.format(index_name))
else:
logger.info('ES index {} already exists. '
'Skip creating an index...'.format(index_name))
except Exception as ex:
logger.error('failed to create an ES index: {}'.format(str(ex)))
# Read pubtator datafile
file_pubtator = Path(__file__).resolve().parents[1] / \
'data/bioconcepts2pubtator_offsets.gz'
with gzip.open(file_pubtator, 'rt') as f:
doc = []
counter = [0, 0, 0] # [num_attempts, docs_indexed, batch_counter]
actions = []
for line in f:
line = line.rstrip()
if len(line) > 0:
doc.append(line)
else:
create_action(actions, doc)
doc = []
counter[2] += 1
if counter[2] >= conf['batch_size']:
counter[0] += len(actions)
succ = run_indexing(es, actions)
succ = len(actions) # debug
counter[1] += succ
print('docs read: {}, docs indexed: {}, batch size: {}'
''.format(*counter), end='\r', flush=True)
counter[2] = 0
actions = []
if actions: # Run the rest of actions
counter[0] += len(actions)
succ = run_indexing(es, actions)
counter[1] += succ
# Print out the statistics
logger.info(f"Number of docs read: {counter[0]}")
logger.info(f"Number of docs indexed: {counter[1]}")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment