Skip to content

Instantly share code, notes, and snippets.

@tobigue
Last active December 11, 2015 16:13
Show Gist options
  • Save tobigue/3950f2497fdd95b95b4b to your computer and use it in GitHub Desktop.
Save tobigue/3950f2497fdd95b95b4b to your computer and use it in GitHub Desktop.
Elasticsaerch reindexing command line script
from __future__ import print_function
import argparse
import sys
import time
from copy import deepcopy
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import reindex
parser = argparse.ArgumentParser(
description='Script to reindex all documents from an elasticsearch'
'index to another elasticsearch index.')
parser.add_argument('source_index',
help='Name of the elasticsearch source index.')
parser.add_argument('target_index',
help='Name of the elasticsearch target index.')
parser.add_argument('--source_host',
default="localhost:9200",
help='Name of the elasticsearch source cluster.')
parser.add_argument('--target_host',
default="localhost:9200",
help='Name of the elasticsearch target cluster.')
parser.add_argument('--chunk_size', type=int, default=500,
help='Number of docs in one chunk sent to es')
parser.add_argument('--scroll', type=int, default=500,
help='Specify how long a consistent view of the '
'index should be maintained for scrolled search')
args = parser.parse_args()
query = {
"query": {
"match_all": {}
}
}
source_client = Elasticsearch([args.source_host])
if args.source_host == args.target_host:
target_client = source_client
else:
target_client = Elasticsearch([args.target_host])
source_cluster_name = source_client.info()["cluster_name"]
source_index_stats = source_client.indices.stats(index=args.source_index)
source_index_size = source_index_stats["_all"]["primaries"]["docs"]["count"]
target_cluster_name = target_client.info()["cluster_name"]
try:
target_index_stats = target_client.indices.stats(index=args.target_index)
target_index_size = \
target_index_stats["_all"]["primaries"]["docs"]["count"]
except NotFoundError:
target_index_size = "[INDEX NOT EXIST]"
aggs_query = deepcopy(query)
aggs_query["aggs"] = {
"_type": {
"terms": {
"field": "_type"
}
}
}
query_stats = source_client.search(
index=args.source_index,
body=aggs_query,
size=0,
)
type_buckets = query_stats["aggregations"]["_type"]["buckets"]
types_info = '\n'.join([
" type '{key}': {doc_count}".format(**b) for b in type_buckets
])
raw_input("""
GETTING READY TO REINDEX:
=========================
FROM (source):
Host: {source_host} (Cluster name: {source_cluster_name})
Index: {source_index} ({source_index_size} documents)
WHAT:
{total_hits} documents (in batches of {chunk_size} documents)
{types_info}
TO (target):
Host: {target_host} (Cluster name: {target_cluster_name})
Index: {target_index} ({target_index_size} documents)
Press [CTRL-C] to abort or [ENTER] to proceed.
""".format(
chunk_size=args.chunk_size,
source_cluster_name=source_cluster_name,
source_host=args.source_host,
source_index=args.source_index,
source_index_size=source_index_size,
target_cluster_name=target_cluster_name,
target_host=args.target_host,
target_index=args.target_index,
target_index_size=target_index_size,
total_hits=query_stats["hits"]["total"],
types_info=types_info,
))
for i in range(3, 0, -1):
sys.stdout.write("%s\r" % i)
sys.stdout.flush()
time.sleep(1)
print("Started reindexing...")
start = datetime.now()
successful, errors = reindex(client=source_client,
source_index=args.source_index,
target_index=args.target_index,
target_client=target_client,
query=query,
chunk_size=args.chunk_size,
scan_kwargs={"size": args.chunk_size}
)
print("Finished (took: %s)" % (datetime.now() - start))
print("Successfully indexed documents:", successful)
print("Errors:", errors)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment