Skip to content

Instantly share code, notes, and snippets.

@zobayer1
Last active June 28, 2022 23:43
Show Gist options
  • Save zobayer1/4fbc400acc6f66ede846c933de1eeb80 to your computer and use it in GitHub Desktop.
Save zobayer1/4fbc400acc6f66ede846c933de1eeb80 to your computer and use it in GitHub Desktop.
Elasticsearch bulk API usage example
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import requests
class ElasticBulk(object):
"""Class for indexing documents in bulk"""
def __init__(self, host, index, doc_type="_doc", session=requests.session(), bulk_size=1000):
"""Instantiates an ElasticBulk object, sets default values
Args:
host (str): Elasticsearch host url. Example: ``http://localhost:9200``.
index (str): Elasticsearch index name. Example: ``my_index``.
doc_type (str): Elasticsearch document type. Example: ``_doc``.
session (:obj:`Session`, optional): Http request session object.
If not provided, creates a new session object.
bulk_size (int, optional): Bulk operation size. Default: ``1000``.
"""
self.session = session if session else requests.session()
self.bulk_url = f'{host}/{index}/{doc_type}/_bulk'
self.bulk_size = bulk_size
self.headers = {'Content-Type': 'application/x-ndjson'}
def upload_bulk(self, generator_fn):
"""Uploads documents in bulk
Args:
"generator_fn (:func:): A generator function for pulling data
"""
docs = list()
entries = 0
for doc in generator_fn():
docs.append({"index": {}})
docs.append(doc)
entries += 1
if entries % self.bulk_size == 0:
bulk_data = "\n".join(json.dumps(s) for s in docs) + "\n"
response = self.session.post(url=self.bulk_url, data=bulk_data, headers=self.headers)
response.raise_for_status()
docs.clear()
if entries % self.bulk_size != 0:
bulk_data = "\n".join(json.dumps(s) for s in docs) + "\n"
response = self.session.post(url=self.bulk_url, data=bulk_data, headers=self.headers)
response.raise_for_status()
docs.clear()
return True
@zobayer1
Copy link
Author

zobayer1 commented Nov 7, 2020

Test:

from es_bulk import ElasticBulk


def gen_data():
    data = [
        {"car_make":"Chevrolet","car_model":"Traverse","car_year":2009,"car_color":"Yellow","made_in":"Guinea-Bissau"},
        {"car_make":"Mitsubishi","car_model":"Eclipse","car_year":1996,"car_color":"Khaki","made_in":"Luxembourg"},
        {"car_make":"Ford","car_model":"Lightning","car_year":1994,"car_color":"Teal","made_in":"China"},
        {"car_make":"Mercedes-Benz","car_model":"Sprinter 2500","car_year":2012,"car_color":"Yellow","made_in":"Colombia"},
        {"car_make":"Nissan","car_model":"Maxima","car_year":2002,"car_color":"Yellow","made_in":"Kazakhstan"},
        {"car_make":"Chrysler","car_model":"Pacifica","car_year":2006,"car_color":"Crimson","made_in":"China"}
    ]

    for d in data:
        yield d


es_bulk = ElasticBulk("http://localhost:9200", "my_index")

es_bulk.upload_bulk(generator_fn=gen_data)

@syedarehaq
Copy link

Since version 8.0, Elasticsearch changed the bulk API endpoint from typed <target>/<type>/_bulk to typeless <target>/_bulk. See Migrating to Elasticsearch 8.0. So, the following line should be conditional based on the version of Elasticsearch:

self.bulk_url = f'{host}/{index}/{doc_type}/_bulk

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment