Skip to content

Instantly share code, notes, and snippets.

@andr1an
Last active November 16, 2021 16:47
Show Gist options
  • Save andr1an/84c0bb94a51db1d5ba430fcecb2ef3a0 to your computer and use it in GitHub Desktop.
Save andr1an/84c0bb94a51db1d5ba430fcecb2ef3a0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Dumps Elasticsearch index; handy for backing up Kibana."""
import sys
import argparse
import json
import logging
import requests
logger = logging.getLogger(__name__)
def parse_elasticsearch_response(response):
response.raise_for_status()
data = response.json()
assert data['timed_out'] is False
assert data['_shards']['failed'] == 0
assert data['_shards']['skipped'] == 0
assert data['_shards']['successful'] == data['_shards']['total']
return data
def get_documents(dump_index, elasticsearch_url='http://localhost:9200',
scroll_timeout='3m'):
session = requests.Session()
response = session.post('{}/{}/_search'.format(elasticsearch_url,
dump_index),
params={'scroll': scroll_timeout},
json={'size': 100})
data = parse_elasticsearch_response(response)
scroll_id = data['_scroll_id']
total_count = data['hits']['total']
hits = data['hits'].get('hits', [])
hits_count = len(hits)
real_count = hits_count
logger.debug('Total docs: %s, got: %s', total_count, hits_count)
while hits:
for doc in hits:
yield doc
response = session.post('{}/_search/scroll'.format(elasticsearch_url),
json={'scroll': scroll_timeout,
'scroll_id': scroll_id})
data = parse_elasticsearch_response(response)
assert scroll_id == data['_scroll_id']
assert total_count == data['hits']['total']
hits = data['hits'].get('hits', [])
hits_count = len(hits)
real_count += hits_count
logger.debug('Total docs: %s, got: %s', total_count, hits_count)
if real_count != total_count:
logger.warning('Total docs: %s, but got only %s!', total_count,
real_count)
else:
logger.info('Got total of %s docs', real_count)
def parse_args(argv=sys.argv[1:]):
parser = argparse.ArgumentParser()
parser.add_argument('-U', '--elasticsearch-url',
default='http://localhost:9200')
parser.add_argument('-i', '--dump-index', default='.kibana')
parser.add_argument('-t', '--scroll-timeout', default='3m')
parser.add_argument('-o', '--out-file', type=argparse.FileType('w'),
default='-')
parser.add_argument('--debug', action='store_true')
return parser.parse_args(argv)
def main():
args = parse_args()
logger.debug('Args: %s', args)
if not args.out_file.isatty():
logger.info('Writing to %s', args.out_file.name)
logger.info('Dumping index %s from %s', args.dump_index,
args.elasticsearch_url)
for doc in get_documents(dump_index=args.dump_index,
elasticsearch_url=args.elasticsearch_url,
scroll_timeout=args.scroll_timeout):
json.dump(doc, args.out_file)
logger.info('Dump finished')
if __name__ == '__main__':
logging.basicConfig(
format='%(asctime)s %(name)s %(levelname)s %(message)s',
datefmt='[%Y-%m-%d %H:%M:%S]',
level=logging.DEBUG if '--debug' in sys.argv else logging.INFO)
logging.getLogger('requests').setLevel(logging.WARNING)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment