Created
June 29, 2023 19:00
-
-
Save saaj/931b1455db85d193f66f86e7807c3df4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
"""Elasticsearch ``application/x-ndjson`` file loader.""" | |
import argparse | |
import fileinput | |
import itertools | |
import json | |
import logging | |
import sys | |
from urllib.error import HTTPError | |
from urllib.parse import urljoin | |
from urllib.request import urlopen, Request | |
try: | |
from tqdm import tqdm | |
except ImportError: | |
tqdm = lambda v, **kwargs: v | |
def count_lines(filenames): | |
with fileinput.input(filenames) as fp: | |
return sum(1 for _ in filter(bool, map(str.strip, fp))) | |
def get_lines(filenames): | |
line_count = count_lines(filenames) | |
with fileinput.input(filenames) as fp: | |
for l in tqdm(filter(bool, map(str.strip, fp)), total=line_count): | |
yield l | |
def chunk_for_bulk(seq, size=1024): | |
action_line_chunks = ( | |
list(itertools.chain.from_iterable( | |
zip( | |
itertools.repeat('{"index": {}}'), | |
[i for i in c if i is not None], | |
) | |
)) | |
for c in itertools.zip_longest(*[iter(seq)] * size) | |
) | |
return action_line_chunks | |
def load(filename: list, append: bool, elasticsearch_url: str, index: str): | |
filenames = filename | |
index_url = urljoin(elasticsearch_url, index) | |
if not append: | |
logging.info('Deleting %s', index_url) | |
try: | |
urlopen(Request(index_url, method='DELETE')) | |
except HTTPError as ex: | |
logging.error( | |
'Could not delete index %s', json.loads(ex.fp.read().decode()) | |
) | |
else: | |
logging.info('Appending to existing index, %s', index_url) | |
logging.info('Loading documents into %s', index_url) | |
for lines in chunk_for_bulk(get_lines(filenames)): | |
try: | |
req = Request( | |
f'{index_url}/_bulk', | |
'\n'.join(lines).encode() + b'\n', | |
headers={'content-type': 'application/x-ndjson'} | |
) | |
urlopen(req) | |
except HTTPError as ex: | |
logging.error( | |
'Error\n%s\non\n> %s\n', | |
json.loads(ex.fp.read().decode()), | |
lines, | |
) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') | |
parser = argparse.ArgumentParser( | |
description='Elasticsearch nd-json bulk loader', | |
) | |
parser.add_argument( | |
'--append', | |
action='store_true', | |
help='Do not remove existing index, but rather append to it.', | |
) | |
parser.add_argument( | |
'-i', '--index', | |
default='logs', | |
help='Index name to load into, by default: %(default)s.', | |
) | |
parser.add_argument( | |
'-e', '--elasticsearch-url', | |
default='http://localhost:9200/', | |
help='Base URL of Elasticsearch, by default: %(default)s.', | |
) | |
parser.add_argument( | |
'filename', | |
nargs='+', | |
help='nd-json filenames to load', | |
) | |
load(**vars(parser.parse_args())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment