Skip to content

Instantly share code, notes, and snippets.

@DDuarte
Created August 20, 2020 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DDuarte/9e93ee42a7e39b96c69d1d452ab18576 to your computer and use it in GitHub Desktop.
Save DDuarte/9e93ee42a7e39b96c69d1d452ab18576 to your computer and use it in GitHub Desktop.
Shrink existing ElasticSearch indexes, keep index name (using aliases)
import argparse
import logging
import typing as t
from elasticsearch import Elasticsearch # type: ignore
from tqdm.auto import tqdm # type: ignore
JSON = t.Union[str, int, float, bool, None, t.Dict[str, t.Any], t.List[t.Any]]
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def setup_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Shrink existing indices.",
epilog="https://www.elastic.co/guide/en/elasticsearch/reference/6.8/indices-shrink-index.html",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("url", help="URL of the ElasticSearch cluster")
parser.add_argument("node", help="name of node to temporarily relocate to")
parser.add_argument("indices", nargs="+", help="an index to shrink, wildcards will be expanded")
parser.add_argument("--shards", type=int, default=1, help="number of shards to shrink to")
parser.add_argument("--replicas", type=int, default=1, help="number of replicas")
parser.add_argument(
"--ca-certs", default="/etc/ssl/certs/ca-certificates.crt", help="path to CA certificate bundle",
)
parser.add_argument(
"--no-verify-certs", action="store_true", default=False, help="disable certificate verification"
)
parser.add_argument("--timeout", type=int, default=300, help="timeout for all requests")
return parser.parse_args()
def setup_es(url: str, no_verify_certs: bool, ca_certs: str, timeout: int) -> Elasticsearch:
return Elasticsearch(
[url], verify_certs=not no_verify_certs, ca_certs=ca_certs if not no_verify_certs else None, timeout=timeout,
)
def expand_indices(es: Elasticsearch, indices: t.List[str], shards: int) -> t.Set[str]:
expanded_indices: t.Set[str] = set()
for index in indices:
for expanded in es.cat.indices(index, h="index,pri").split("\n"):
if not expanded:
continue
current_index, current_shards = expanded.split()
if int(current_shards) > shards: # can only shrink to less shards
expanded_indices.add(current_index)
return expanded_indices
def route_block_index(es: Elasticsearch, index: str, node: str) -> JSON:
return es.indices.put_settings(
{"settings": {"index.routing.allocation.require._name": node, "index.blocks.write": "true"}},
index=index,
preserve_existing="true",
)
def wait_for(
es: Elasticsearch, wait_for_no_relocating_shards: t.Optional[bool] = None, wait_for_status: t.Optional[str] = None
) -> JSON:
kwargs: t.Dict[str, t.Any] = {}
if wait_for_status is not None:
kwargs["wait_for_status"] = wait_for_status
if wait_for_no_relocating_shards is not None:
kwargs["wait_for_no_relocating_shards"] = str(wait_for_no_relocating_shards).lower()
while True:
try:
return es.cluster.health(**kwargs, timeout='300s', master_timeout='300s')
except Exception: # ConnectionReset and friends
continue
def shrink_index(es: Elasticsearch, index: str, new_index: str, shards: int, replicas: int) -> JSON:
return es.indices.shrink(
index,
new_index,
params={"copy_settings": "true"},
body={
"settings": {
"index.number_of_replicas": replicas,
"index.number_of_shards": shards,
"index.codec": "best_compression",
"index.routing.allocation.require._name": None,
"index.blocks.write": None,
}
},
wait_for_active_shards=shards * replicas,
)
def delete_index(es: Elasticsearch, index: str) -> JSON:
return es.indices.delete(index=index)
def put_alias_index(es: Elasticsearch, index: str, alias: str) -> JSON:
return es.indices.put_alias(index=index, name=alias)
if __name__ == "__main__":
args = setup_args()
es = setup_es(args.url, args.no_verify_certs, args.ca_certs, args.timeout)
indices = expand_indices(es, args.indices, args.shards)
logger.info("Indices: %s", indices)
pbar = tqdm(indices)
for index in pbar:
pbar.set_description(index)
new_index = "shrink_" + index
pbar.write("Routing to 01 and disabling writes")
res = route_block_index(es, index, args.node)
pbar.write(f"{res}")
pbar.write("Waiting for relocation")
res = wait_for(es, wait_for_no_relocating_shards=True)
pbar.write(f"{res}")
pbar.write("Shrinking index")
res = shrink_index(es, index, new_index, args.shards, args.replicas)
pbar.write(f"{res}")
pbar.write("Waiting for status green")
# docs say yellow is enough; wait for green since we are deleting old index right after
res = wait_for(es, wait_for_status="green")
pbar.write(f"{res}")
pbar.write("Deleting old index")
res = delete_index(es, index)
pbar.write(f"{res}")
pbar.write("Creating alias to old index name")
res = put_alias_index(es, new_index, index)
pbar.write(f"{res}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment