-
-
Save kaplun/73273e0afa044d992a68a2b5db212683 to your computer and use it in GitHub Desktop.
Small ES cli tool to remap an index
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
import json | |
import re | |
import time | |
import logging | |
import click | |
from elasticsearch import Elasticsearch | |
from elasticsearch.helpers import reindex | |
DEFAULT_NODE = ('localhost', 9200) | |
_ERROR1_RE = re.compile(u'mapper \[(?P<field_name>[^]]+)\]') | |
_BAD_FIELDS_ACK_RESPONSES = {} | |
_TRY_TO_FIX_RESPONSES = {} | |
def _extract_bad_field(error_str): | |
match = _ERROR1_RE.search(error_str) | |
if match: | |
return match.groupdict().get('field_name') | |
raise Exception('Unable to extract bad field from %s' % error_str) | |
def _fix_bad_field(index, recid, bad_field, cli): | |
keys = bad_field.split('.') | |
original_record = cli.get(index, recid) | |
cur_elem = original_record['_source'] | |
for key in keys[:-2]: | |
cur_elem = cur_elem[key] | |
cur_elem.pop(keys[-2]) | |
return original_record | |
def _handle_illegal_argument_exception( | |
index_from, index_to, cli, recid, error, yesall=False, | |
): | |
field_name = _extract_bad_field(error['caused_by']['reason']) | |
if field_name not in _BAD_FIELDS_ACK_RESPONSES: | |
if yesall: | |
_BAD_FIELDS_ACK_RESPONSES[field_name] = True | |
else: | |
_BAD_FIELDS_ACK_RESPONSES[field_name] = click.confirm( | |
'Do you want me to try to automatically fix bad field %s? I ' | |
'might delete it' % field_name | |
) | |
if _BAD_FIELDS_ACK_RESPONSES[field_name]: | |
new_record = _fix_bad_field( | |
index=index_from, | |
recid=recid, | |
bad_field=field_name, | |
cli=cli, | |
) | |
cli.index( | |
index=index_to, | |
doc_type=new_record['_type'], | |
body=new_record['_source'], | |
) | |
click.echo('Fixed record %s' % recid) | |
return None | |
def _try_to_migrate(index_from, index_to, cli, recid, error, yesall=False): | |
err_type = error['caused_by']['type'] | |
if err_type not in _TRY_TO_FIX_RESPONSES: | |
if yesall: | |
_TRY_TO_FIX_RESPONSES[err_type] = True | |
else: | |
_TRY_TO_FIX_RESPONSES[err_type] = click.confirm( | |
'Record %s has an error type %s, do you want me to try to fix ' | |
'this kind of errors?' % (recid, err_type) | |
) | |
if not _TRY_TO_FIX_RESPONSES[err_type]: | |
return None | |
fn_name = '_handle_' + err_type | |
if fn_name not in globals(): | |
print "I don't know how to handle %s, skipping record %s" % ( | |
err_type, | |
recid, | |
) | |
return None | |
return globals()[fn_name]( | |
index_from=index_from, | |
index_to=index_to, | |
cli=cli, | |
recid=recid, | |
error=error, | |
yesall=yesall, | |
) | |
@click.command() | |
@click.argument('index_from') | |
@click.argument('index_to') | |
@click.argument('recid') | |
@click.argument('error_type') | |
@click.argument('error_message') | |
@click.option( | |
'-c', | |
'--connect-url', | |
help='Server to connect to, in the form http://user:pass@server:port', | |
default=DEFAULT_NODE[0], | |
) | |
def force_migrate_record( | |
index_from, index_to, recid, error_type, error_message, connect_url, | |
): | |
cli = Elasticsearch([connect_url], verify_certs=False) | |
error = { | |
'caused_by': { | |
'type': error_type, | |
'reason': error_message, | |
} | |
} | |
_try_to_migrate( | |
index_from=index_from, | |
index_to=index_to, | |
cli=cli, | |
recid=recid, | |
error=error, | |
yesall=True, | |
) | |
@click.command() | |
@click.argument('index_from') | |
@click.argument('index_to') | |
@click.option( | |
'-c', | |
'--connect-url', | |
help='Server to connect to, in the form http://user:pass@server:port', | |
default=DEFAULT_NODE[0], | |
) | |
@click.option( | |
'-k', | |
'--chunk', | |
help='Size of the chunks to use', | |
default=500, | |
) | |
@click.option( | |
'-a', | |
'--autofix', | |
help='Try to fix any failed record after copying them over.', | |
default=False, | |
) | |
def copy_index(index_from, index_to, connect_url, chunk, autofix): | |
start_time = time.time() | |
cli = Elasticsearch([connect_url], verify_certs=False) | |
errors = reindex( | |
client=cli, | |
source_index=index_from, | |
target_index=index_to, | |
query=None, | |
target_client=None, | |
chunk_size=chunk, | |
scroll='5m', | |
bulk_kwargs={ | |
'raise_on_error': False, | |
'stats_only': False, | |
} | |
) | |
end_time = time.time() | |
click.echo( | |
'Reindexed %s records in %ds' % (errors[0], end_time - start_time) | |
) | |
click.echo( | |
'%s records per second' % (errors[0] / (end_time - start_time)) | |
) | |
click.echo('Failed records: %d' % len(errors[1])) | |
if errors[1] and ( | |
autofix or click.confirm( | |
'Do you want me to try to fix the failed records?' | |
) | |
): | |
for err_record in errors[1]: | |
_try_to_migrate( | |
index_from=index_from, | |
index_to=index_to, | |
cli=cli, | |
recid=err_record['index']['_id'], | |
error=err_record['index']['error'], | |
yesall=bool(autofix), | |
) | |
elif errors[1]: | |
errors = [ | |
err['index'] | |
for err in errors[1] | |
] | |
errors = json.dumps(errors) | |
with open('errors.json', 'w') as errors_fd: | |
errors_fd.write(errors) | |
click.echo( | |
'Written errors list in errors.json file (in case you want ' | |
'process them later).' | |
) | |
@click.command() | |
@click.argument('name') | |
@click.option( | |
'-m', | |
'--mapping', | |
default=None, | |
help='Mapping file for the index.', | |
) | |
@click.option( | |
'-c', | |
'--connect-url', | |
help='Server to connect to, in the form http://user:pass@server:port', | |
default=DEFAULT_NODE[0], | |
) | |
def create_index(name, mapping, connect_url): | |
cli = Elasticsearch([connect_url], verify_certs=False) | |
if mapping is None: | |
body = '' | |
else: | |
with open(mapping) as mapping_fd: | |
body = mapping_fd.read() | |
cli.indices.create( | |
index=name, | |
body=body, | |
) | |
@click.command() | |
@click.argument('name') | |
@click.option( | |
'-c', | |
'--connect-url', | |
help='Server to connect to, in the form http://user:pass@server:port', | |
default=DEFAULT_NODE[0], | |
) | |
def delete_index(name, connect_url): | |
cli = Elasticsearch([connect_url], verify_certs=False) | |
cli.indices.delete(index=name) | |
@click.command() | |
@click.argument('name') | |
@click.option( | |
'-m', | |
'--mapping', | |
default=None, | |
help='Mapping file for the index.', | |
) | |
@click.option( | |
'-c', | |
'--connect-url', | |
help='Server to connect to, in the form http://user:pass@server:port', | |
default=DEFAULT_NODE[0], | |
) | |
@click.option( | |
'-a', | |
'--autofix', | |
help='Try to fix any failed record after copying them over.', | |
default=False, | |
) | |
def remap(name, mapping, connect_url, autofix): | |
cli = Elasticsearch([connect_url], verify_certs=False) | |
tmp_index = 'remapping_tmp_' + name | |
aliases = cli.indices.get_alias( | |
index=name | |
).get( | |
name, {} | |
).get( | |
'aliases', {} | |
).keys() | |
with open(mapping) as mapping_fd: | |
body = mapping_fd.read() | |
cli.indices.delete(index=tmp_index, ignore=[400, 404]) | |
cli.indices.create(index=tmp_index) | |
for alias in aliases: | |
cli.indices.put_alias(index=tmp_index, name=alias) | |
click.echo( | |
'Created temporary index, will start dumping the data from the old ' | |
'one' | |
) | |
click.confirm('Do you want to continue?', abort=True) | |
reindex( | |
client=cli, | |
source_index=name, | |
target_index=tmp_index, | |
query=None, | |
target_client=None, | |
chunk_size=500, | |
scroll='5m', | |
) | |
click.echo('Populated temporary index, will delete original index') | |
click.confirm('Do you want to continue?', abort=True) | |
cli.indices.delete(name) | |
click.echo('Deleted original index, will recreate with the new mapping') | |
click.confirm('Do you want to continue?', abort=True) | |
cli.indices.create( | |
index=name, | |
body=body, | |
) | |
for alias in aliases: | |
cli.indices.put_alias(index=name, name=alias) | |
click.echo( | |
'Recreated original index, will repopulate with the data from the ' | |
'temporary one' | |
) | |
click.confirm('Do you want to continue?', abort=True) | |
reindex( | |
client=cli, | |
source_index=tmp_index, | |
target_index=name, | |
query=None, | |
target_client=None, | |
chunk_size=500, | |
scroll='5m', | |
) | |
click.echo('Original index repopulated, will remove the temporary index') | |
click.confirm('Do you want to continue?', abort=True) | |
cli.indices.delete(tmp_index) | |
click.echo('Done') | |
@click.group() | |
def cli_main(): | |
pass | |
def main(): | |
logging.captureWarnings(True) | |
cli_main.add_command(create_index) | |
cli_main.add_command(copy_index) | |
cli_main.add_command(delete_index) | |
cli_main.add_command(remap) | |
cli_main.add_command(force_migrate_record) | |
cli_main() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment