Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@kaplun
Forked from david-caro/es_cli.py
Created April 11, 2017 21:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kaplun/73273e0afa044d992a68a2b5db212683 to your computer and use it in GitHub Desktop.
Save kaplun/73273e0afa044d992a68a2b5db212683 to your computer and use it in GitHub Desktop.
Small ES cli tool to remap an index
#!/usr/bin/env python
# encoding: utf-8
import json
import re
import time
import logging
import click
from elasticsearch import Elasticsearch
from elasticsearch.helpers import reindex
DEFAULT_NODE = ('localhost', 9200)
_ERROR1_RE = re.compile(u'mapper \[(?P<field_name>[^]]+)\]')
_BAD_FIELDS_ACK_RESPONSES = {}
_TRY_TO_FIX_RESPONSES = {}
def _extract_bad_field(error_str):
match = _ERROR1_RE.search(error_str)
if match:
return match.groupdict().get('field_name')
raise Exception('Unable to extract bad field from %s' % error_str)
def _fix_bad_field(index, recid, bad_field, cli):
keys = bad_field.split('.')
original_record = cli.get(index, recid)
cur_elem = original_record['_source']
for key in keys[:-2]:
cur_elem = cur_elem[key]
cur_elem.pop(keys[-2])
return original_record
def _handle_illegal_argument_exception(
index_from, index_to, cli, recid, error, yesall=False,
):
field_name = _extract_bad_field(error['caused_by']['reason'])
if field_name not in _BAD_FIELDS_ACK_RESPONSES:
if yesall:
_BAD_FIELDS_ACK_RESPONSES[field_name] = True
else:
_BAD_FIELDS_ACK_RESPONSES[field_name] = click.confirm(
'Do you want me to try to automatically fix bad field %s? I '
'might delete it' % field_name
)
if _BAD_FIELDS_ACK_RESPONSES[field_name]:
new_record = _fix_bad_field(
index=index_from,
recid=recid,
bad_field=field_name,
cli=cli,
)
cli.index(
index=index_to,
doc_type=new_record['_type'],
body=new_record['_source'],
)
click.echo('Fixed record %s' % recid)
return None
def _try_to_migrate(index_from, index_to, cli, recid, error, yesall=False):
err_type = error['caused_by']['type']
if err_type not in _TRY_TO_FIX_RESPONSES:
if yesall:
_TRY_TO_FIX_RESPONSES[err_type] = True
else:
_TRY_TO_FIX_RESPONSES[err_type] = click.confirm(
'Record %s has an error type %s, do you want me to try to fix '
'this kind of errors?' % (recid, err_type)
)
if not _TRY_TO_FIX_RESPONSES[err_type]:
return None
fn_name = '_handle_' + err_type
if fn_name not in globals():
print "I don't know how to handle %s, skipping record %s" % (
err_type,
recid,
)
return None
return globals()[fn_name](
index_from=index_from,
index_to=index_to,
cli=cli,
recid=recid,
error=error,
yesall=yesall,
)
@click.command()
@click.argument('index_from')
@click.argument('index_to')
@click.argument('recid')
@click.argument('error_type')
@click.argument('error_message')
@click.option(
'-c',
'--connect-url',
help='Server to connect to, in the form http://user:pass@server:port',
default=DEFAULT_NODE[0],
)
def force_migrate_record(
index_from, index_to, recid, error_type, error_message, connect_url,
):
cli = Elasticsearch([connect_url], verify_certs=False)
error = {
'caused_by': {
'type': error_type,
'reason': error_message,
}
}
_try_to_migrate(
index_from=index_from,
index_to=index_to,
cli=cli,
recid=recid,
error=error,
yesall=True,
)
@click.command()
@click.argument('index_from')
@click.argument('index_to')
@click.option(
'-c',
'--connect-url',
help='Server to connect to, in the form http://user:pass@server:port',
default=DEFAULT_NODE[0],
)
@click.option(
'-k',
'--chunk',
help='Size of the chunks to use',
default=500,
)
@click.option(
'-a',
'--autofix',
help='Try to fix any failed record after copying them over.',
default=False,
)
def copy_index(index_from, index_to, connect_url, chunk, autofix):
start_time = time.time()
cli = Elasticsearch([connect_url], verify_certs=False)
errors = reindex(
client=cli,
source_index=index_from,
target_index=index_to,
query=None,
target_client=None,
chunk_size=chunk,
scroll='5m',
bulk_kwargs={
'raise_on_error': False,
'stats_only': False,
}
)
end_time = time.time()
click.echo(
'Reindexed %s records in %ds' % (errors[0], end_time - start_time)
)
click.echo(
'%s records per second' % (errors[0] / (end_time - start_time))
)
click.echo('Failed records: %d' % len(errors[1]))
if errors[1] and (
autofix or click.confirm(
'Do you want me to try to fix the failed records?'
)
):
for err_record in errors[1]:
_try_to_migrate(
index_from=index_from,
index_to=index_to,
cli=cli,
recid=err_record['index']['_id'],
error=err_record['index']['error'],
yesall=bool(autofix),
)
elif errors[1]:
errors = [
err['index']
for err in errors[1]
]
errors = json.dumps(errors)
with open('errors.json', 'w') as errors_fd:
errors_fd.write(errors)
click.echo(
'Written errors list in errors.json file (in case you want '
'process them later).'
)
@click.command()
@click.argument('name')
@click.option(
'-m',
'--mapping',
default=None,
help='Mapping file for the index.',
)
@click.option(
'-c',
'--connect-url',
help='Server to connect to, in the form http://user:pass@server:port',
default=DEFAULT_NODE[0],
)
def create_index(name, mapping, connect_url):
cli = Elasticsearch([connect_url], verify_certs=False)
if mapping is None:
body = ''
else:
with open(mapping) as mapping_fd:
body = mapping_fd.read()
cli.indices.create(
index=name,
body=body,
)
@click.command()
@click.argument('name')
@click.option(
'-c',
'--connect-url',
help='Server to connect to, in the form http://user:pass@server:port',
default=DEFAULT_NODE[0],
)
def delete_index(name, connect_url):
cli = Elasticsearch([connect_url], verify_certs=False)
cli.indices.delete(index=name)
@click.command()
@click.argument('name')
@click.option(
'-m',
'--mapping',
default=None,
help='Mapping file for the index.',
)
@click.option(
'-c',
'--connect-url',
help='Server to connect to, in the form http://user:pass@server:port',
default=DEFAULT_NODE[0],
)
@click.option(
'-a',
'--autofix',
help='Try to fix any failed record after copying them over.',
default=False,
)
def remap(name, mapping, connect_url, autofix):
cli = Elasticsearch([connect_url], verify_certs=False)
tmp_index = 'remapping_tmp_' + name
aliases = cli.indices.get_alias(
index=name
).get(
name, {}
).get(
'aliases', {}
).keys()
with open(mapping) as mapping_fd:
body = mapping_fd.read()
cli.indices.delete(index=tmp_index, ignore=[400, 404])
cli.indices.create(index=tmp_index)
for alias in aliases:
cli.indices.put_alias(index=tmp_index, name=alias)
click.echo(
'Created temporary index, will start dumping the data from the old '
'one'
)
click.confirm('Do you want to continue?', abort=True)
reindex(
client=cli,
source_index=name,
target_index=tmp_index,
query=None,
target_client=None,
chunk_size=500,
scroll='5m',
)
click.echo('Populated temporary index, will delete original index')
click.confirm('Do you want to continue?', abort=True)
cli.indices.delete(name)
click.echo('Deleted original index, will recreate with the new mapping')
click.confirm('Do you want to continue?', abort=True)
cli.indices.create(
index=name,
body=body,
)
for alias in aliases:
cli.indices.put_alias(index=name, name=alias)
click.echo(
'Recreated original index, will repopulate with the data from the '
'temporary one'
)
click.confirm('Do you want to continue?', abort=True)
reindex(
client=cli,
source_index=tmp_index,
target_index=name,
query=None,
target_client=None,
chunk_size=500,
scroll='5m',
)
click.echo('Original index repopulated, will remove the temporary index')
click.confirm('Do you want to continue?', abort=True)
cli.indices.delete(tmp_index)
click.echo('Done')
@click.group()
def cli_main():
pass
def main():
logging.captureWarnings(True)
cli_main.add_command(create_index)
cli_main.add_command(copy_index)
cli_main.add_command(delete_index)
cli_main.add_command(remap)
cli_main.add_command(force_migrate_record)
cli_main()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment