Created
August 20, 2020 17:12
-
-
Save DDuarte/9e93ee42a7e39b96c69d1d452ab18576 to your computer and use it in GitHub Desktop.
Shrink existing ElasticSearch indexes, keep index name (using aliases)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import typing as t | |
from elasticsearch import Elasticsearch # type: ignore | |
from tqdm.auto import tqdm # type: ignore | |
JSON = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any]] | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def setup_args() -> argparse.Namespace: | |
parser = argparse.ArgumentParser( | |
description="Shrink existing indices.", | |
epilog="https://www.elastic.co/guide/en/elasticsearch/reference/6.8/indices-shrink-index.html", | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
) | |
parser.add_argument("url", help="URL of the ElasticSearch cluster") | |
parser.add_argument("node", help="name of node to temporarily relocate to") | |
parser.add_argument("indices", nargs="+", help="an index to shrink, wildcards will be expanded") | |
parser.add_argument("--shards", type=int, default=1, help="number of shards to shrink to") | |
parser.add_argument("--replicas", type=int, default=1, help="number of replicas") | |
parser.add_argument( | |
"--ca-certs", default="/etc/ssl/certs/ca-certificates.crt", help="path to CA certificate bundle", | |
) | |
parser.add_argument( | |
"--no-verify-certs", action="store_true", default=False, help="disable certificate verification" | |
) | |
parser.add_argument("--timeout", type=int, default=300, help="timeout for all requests") | |
return parser.parse_args() | |
def setup_es(url: str, no_verify_certs: bool, ca_certs: str, timeout: int) -> Elasticsearch: | |
return Elasticsearch( | |
[url], verify_certs=not no_verify_certs, ca_certs=ca_certs if not no_verify_certs else None, timeout=timeout, | |
) | |
def expand_indices(es: Elasticsearch, indices: t.List[str], shards: int) -> t.Set[str]: | |
expanded_indices: t.Set[str] = set() | |
for index in indices: | |
for expanded in es.cat.indices(index, h="index,pri").split("\n"): | |
if not expanded: | |
continue | |
current_index, current_shards = expanded.split() | |
if int(current_shards) > shards: # can only shrink to less shards | |
expanded_indices.add(current_index) | |
return expanded_indices | |
def route_block_index(es: Elasticsearch, index: str, node: str) -> JSON: | |
return es.indices.put_settings( | |
{"settings": {"index.routing.allocation.require._name": node, "index.blocks.write": "true"}}, | |
index=index, | |
preserve_existing="true", | |
) | |
def wait_for( | |
es: Elasticsearch, wait_for_no_relocating_shards: t.Optional[bool] = None, wait_for_status: t.Optional[str] = None | |
) -> JSON: | |
kwargs: t.Dict[str, t.Any] = {} | |
if wait_for_status is not None: | |
kwargs["wait_for_status"] = wait_for_status | |
if wait_for_no_relocating_shards is not None: | |
kwargs["wait_for_no_relocating_shards"] = str(wait_for_no_relocating_shards).lower() | |
while True: | |
try: | |
return es.cluster.health(**kwargs, timeout='300s', master_timeout='300s') | |
except Exception: # ConnectionReset and friends | |
continue | |
def shrink_index(es: Elasticsearch, index: str, new_index: str, shards: int, replicas: int) -> JSON: | |
return es.indices.shrink( | |
index, | |
new_index, | |
params={"copy_settings": "true"}, | |
body={ | |
"settings": { | |
"index.number_of_replicas": replicas, | |
"index.number_of_shards": shards, | |
"index.codec": "best_compression", | |
"index.routing.allocation.require._name": None, | |
"index.blocks.write": None, | |
} | |
}, | |
wait_for_active_shards=shards * replicas, | |
) | |
def delete_index(es: Elasticsearch, index: str) -> JSON: | |
return es.indices.delete(index=index) | |
def put_alias_index(es: Elasticsearch, index: str, alias: str) -> JSON: | |
return es.indices.put_alias(index=index, name=alias) | |
if __name__ == "__main__": | |
args = setup_args() | |
es = setup_es(args.url, args.no_verify_certs, args.ca_certs, args.timeout) | |
indices = expand_indices(es, args.indices, args.shards) | |
logger.info("Indices: %s", indices) | |
pbar = tqdm(indices) | |
for index in pbar: | |
pbar.set_description(index) | |
new_index = "shrink_" + index | |
pbar.write("Routing to 01 and disabling writes") | |
res = route_block_index(es, index, args.node) | |
pbar.write(f"{res}") | |
pbar.write("Waiting for relocation") | |
res = wait_for(es, wait_for_no_relocating_shards=True) | |
pbar.write(f"{res}") | |
pbar.write("Shrinking index") | |
res = shrink_index(es, index, new_index, args.shards, args.replicas) | |
pbar.write(f"{res}") | |
pbar.write("Waiting for status green") | |
# docs say yellow is enough; wait for green since we are deleting old index right after | |
res = wait_for(es, wait_for_status="green") | |
pbar.write(f"{res}") | |
pbar.write("Deleting old index") | |
res = delete_index(es, index) | |
pbar.write(f"{res}") | |
pbar.write("Creating alias to old index name") | |
res = put_alias_index(es, new_index, index) | |
pbar.write(f"{res}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment