Skip to content

Instantly share code, notes, and snippets.

@saaj
Created June 29, 2023 19:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saaj/931b1455db85d193f66f86e7807c3df4 to your computer and use it in GitHub Desktop.
Save saaj/931b1455db85d193f66f86e7807c3df4 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
"""Elasticsearch ``application/x-ndjson`` file loader."""
import argparse
import fileinput
import itertools
import json
import logging
import sys
from urllib.error import HTTPError
from urllib.parse import urljoin
from urllib.request import urlopen, Request
try:
from tqdm import tqdm
except ImportError:
tqdm = lambda v, **kwargs: v
def count_lines(filenames):
with fileinput.input(filenames) as fp:
return sum(1 for _ in filter(bool, map(str.strip, fp)))
def get_lines(filenames):
line_count = count_lines(filenames)
with fileinput.input(filenames) as fp:
for l in tqdm(filter(bool, map(str.strip, fp)), total=line_count):
yield l
def chunk_for_bulk(seq, size=1024):
action_line_chunks = (
list(itertools.chain.from_iterable(
zip(
itertools.repeat('{"index": {}}'),
[i for i in c if i is not None],
)
))
for c in itertools.zip_longest(*[iter(seq)] * size)
)
return action_line_chunks
def load(filename: list, append: bool, elasticsearch_url: str, index: str):
filenames = filename
index_url = urljoin(elasticsearch_url, index)
if not append:
logging.info('Deleting %s', index_url)
try:
urlopen(Request(index_url, method='DELETE'))
except HTTPError as ex:
logging.error(
'Could not delete index %s', json.loads(ex.fp.read().decode())
)
else:
logging.info('Appending to existing index, %s', index_url)
logging.info('Loading documents into %s', index_url)
for lines in chunk_for_bulk(get_lines(filenames)):
try:
req = Request(
f'{index_url}/_bulk',
'\n'.join(lines).encode() + b'\n',
headers={'content-type': 'application/x-ndjson'}
)
urlopen(req)
except HTTPError as ex:
logging.error(
'Error\n%s\non\n> %s\n',
json.loads(ex.fp.read().decode()),
lines,
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
parser = argparse.ArgumentParser(
description='Elasticsearch nd-json bulk loader',
)
parser.add_argument(
'--append',
action='store_true',
help='Do not remove existing index, but rather append to it.',
)
parser.add_argument(
'-i', '--index',
default='logs',
help='Index name to load into, by default: %(default)s.',
)
parser.add_argument(
'-e', '--elasticsearch-url',
default='http://localhost:9200/',
help='Base URL of Elasticsearch, by default: %(default)s.',
)
parser.add_argument(
'filename',
nargs='+',
help='nd-json filenames to load',
)
load(**vars(parser.parse_args()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment