Skip to content

Instantly share code, notes, and snippets.

@soldni
Created March 13, 2015 02:56
Show Gist options
  • Save soldni/91c5499c8cf33ca3caa8 to your computer and use it in GitHub Desktop.
Save soldni/91c5499c8cf33ca3caa8 to your computer and use it in GitHub Desktop.
migrate an index from one ElasticSearch cluster to another one.
# built-in modules
import json
from argparse import ArgumentParser
from time import time as now
# installed modules
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from elasticsearch.exceptions import ConflictError
ag = ArgumentParser()
ag.add_argument('-s', '--source', nargs='+', dest='src', default=None,
help='address port index username password')
ag.add_argument('-d', '--destination', nargs='+', dest='dst', default=None,
help='address port index username password')
ag.add_argument('-j', '--json-config', dest='config', default=None,
help='load configuration form json file')
opts = ag.parse_args()
if opts.config is not None:
j = json.load(file(opts.config, 'rb'))
opts.src = j['src']
opts.dst = j['dst']
print "[status] source: http://{}:{}/{}".format(*opts.src[:3])
print "[status] dstination: http://{}:{}/{}".format(*opts.dst[:3])
kwargs_src = {'host': opts.src[0], 'port': int(opts.src[1])}
if len(opts.src) > 3:
kwargs_src['http_auth'] = '{}:{}'.format(*opts.src[3:])
client_src = Elasticsearch(**kwargs_src)
kwargs_dst = {'host': opts.dst[0], 'port': int(opts.dst[1])}
if len(opts.dst) > 3:
kwargs_dst['http_auth'] = '{}:{}'.format(*opts.dst[3:])
client_dst = Elasticsearch(**kwargs_dst)
if client_dst.indices.exists(opts.dst[2]):
while True:
overwrite = raw_input('index exists, overwrite? [y/N]: ')
overwrite = overwrite.strip().lower()
if overwrite == 'y' or overwrite == 'n' or len(overwrite) == 0:
break
if overwrite == 'y':
client_dst.indices.delete(opts.dst[2])
else:
print '[status] aborting.'
exit()
src_mapping = client_src.indices.get_mapping(opts.src[2])
src_settings = client_src.indices.get_settings(opts.src[2])
src_warmer = client_src.indices.get_warmer(opts.src[2])
client_dst.indices.create(opts.dst[2],
body={'mapping': src_mapping,
'settings': src_settings,
'warmer': src_warmer})
print '[status] mapping, settings, and warmer successfully migrated.'
q_body = {
"query": {"match_all": {}}
}
it = scan(client_src, query=q_body, index=opts.src[2])
created = duplicate = 0
stats = client_src.indices.stats(opts.src[2])
total = stats['_all']['primaries']['docs']['count']
print('[status] {:,} documents to transfer.'.format(total))
start = now()
while True:
proc = (created + duplicate)
proc_frac = float(proc) / total
if ((proc * 100) / total > ((proc - 1) * 100) / total and proc > 0):
print(('[status] {:.2%} done ({:,} docs - {:.2f} min elapsed).'
'').format(proc_frac, proc, (now() - start) / 60))
try:
doc = it.next()
except StopIteration:
exit()
try:
client_dst.create(index=opts.dst[2], body=doc['_source'],
id=doc['_id'], doc_type=doc['_type'])
created += 1
except ConflictError:
duplicate += 1
print('[status] done; total time: {:.2f} h;'.format((now() / start) / 3600))
print(' migrated: {} - duplicates: {}.'.format(created, duplicate))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment