Last active
June 28, 2022 23:43
-
-
Save zobayer1/4fbc400acc6f66ede846c933de1eeb80 to your computer and use it in GitHub Desktop.
Elasticsearch bulk API usage example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import json | |
import requests | |
class ElasticBulk(object): | |
"""Class for indexing documents in bulk""" | |
def __init__(self, host, index, doc_type="_doc", session=requests.session(), bulk_size=1000): | |
"""Instantiates an ElasticBulk object, sets default values | |
Args: | |
host (str): Elasticsearch host url. Example: ``http://localhost:9200``. | |
index (str): Elasticsearch index name. Example: ``my_index``. | |
doc_type (str): Elasticsearch document type. Example: ``_doc``. | |
session (:obj:`Session`, optional): Http request session object. | |
If not provided, creates a new session object. | |
bulk_size (int, optional): Bulk operation size. Default: ``1000``. | |
""" | |
self.session = session if session else requests.session() | |
self.bulk_url = f'{host}/{index}/{doc_type}/_bulk' | |
self.bulk_size = bulk_size | |
self.headers = {'Content-Type': 'application/x-ndjson'} | |
def upload_bulk(self, generator_fn): | |
"""Uploads documents in bulk | |
Args: | |
"generator_fn (:func:): A generator function for pulling data | |
""" | |
docs = list() | |
entries = 0 | |
for doc in generator_fn(): | |
docs.append({"index": {}}) | |
docs.append(doc) | |
entries += 1 | |
if entries % self.bulk_size == 0: | |
bulk_data = "\n".join(json.dumps(s) for s in docs) + "\n" | |
response = self.session.post(url=self.bulk_url, data=bulk_data, headers=self.headers) | |
response.raise_for_status() | |
docs.clear() | |
if entries % self.bulk_size != 0: | |
bulk_data = "\n".join(json.dumps(s) for s in docs) + "\n" | |
response = self.session.post(url=self.bulk_url, data=bulk_data, headers=self.headers) | |
response.raise_for_status() | |
docs.clear() | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Since version 8.0, Elasticsearch changed the bulk API endpoint from typed
<target>/<type>/_bulk
to typeless<target>/_bulk
. See Migrating to Elasticsearch 8.0. So, the following line should be conditional based on the version of Elasticsearch: