Skip to content

Instantly share code, notes, and snippets.

@cosimo
Last active June 2, 2022 15:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cosimo/e0f93b4375ebff38472d4238882e6020 to your computer and use it in GitHub Desktop.
Save cosimo/e0f93b4375ebff38472d4238882e6020 to your computer and use it in GitHub Desktop.
Check if any elasticsearch index replica shards are out-of-sync compared to the relative primary (used to rectify some network partition-caused outdated shard problems)
#!/usr/bin/env python3
# encoding: utf-8
"""
Identify out-of-sync replica shards for an Elasticsearch index
--------------------------------------------------------------
This program will output a list of all shards for a given index, identifying
the ones that are out-of-sync with respect to the primary shard.
Usage:
curl -u"elastic:<password>" "http://localhost:9200/<index-name>/_stats?level=shards" \
| ./elasticsearch_out_of_sync_shards.py
Using the Elasticsearch index stats API endpoint (`/<index-name>/_stats?level=shards`)
it's possible to detect whether any of the replica shards have falled out of sync.
This can happen in case of network partitions. This small utility was helpful
to me to quickly identify which replica shards needed resync.
A resync can be triggered while the cluster is online by using the [cluster
reroute](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/cluster-reroute.html#cluster-reroute)
API. For instance:
curl -u"elastic:<password>" "http://localhost:9200/_cluster/reroute?dry_run" \
-H "Content-type: application/json" \
--data-raw '{"commands":[{"cancel":{"index":"my-index", "shard":99, "node":"node-id"}}]}'
(Remove the `?dry_run` string to make the command effective)
Sample output:
$ ./elasticsearch_out_of_sync_shards.py < shards_stats.json
folders_v1:0:oyhesxZMRPiTaW0K_cfxog:P STARTED 15072236 15072236
folders_v1:0:zCiGRjfgQ8m-vDeKipXrhg:r STARTED 15072236 15072236 ✓
folders_v1:1:tpepCgDURnKPvmU6ZhFCbA:P STARTED 15083765 15083765
folders_v1:1:VjY5foFtRWWSY389kcG0nw:r STARTED 15083765 15083765 ✓
folders_v1:2:ufzD8kJRSTui0COU_cagWA:P STARTED 15090238 15090238
folders_v1:2:tpepCgDURnKPvmU6ZhFCbA:r STARTED 15090238 15090238 ✓
folders_v1:3:zCiGRjfgQ8m-vDeKipXrhg:P STARTED 15073510 15073510
folders_v1:3:tpepCgDURnKPvmU6ZhFCbA:r STARTED 15073510 15073510 ✓
folders_v1:4:zCiGRjfgQ8m-vDeKipXrhg:P STARTED 15099499 15099499
folders_v1:4:tpepCgDURnKPvmU6ZhFCbA:r STARTED 15099499 15099499 ✓
folders_v1:5:ufzD8kJRSTui0COU_cagWA:P STARTED 15062453 15062453
folders_v1:5:tpepCgDURnKPvmU6ZhFCbA:r STARTED 15062453 15062453 ✓
folders_v1:6:ufzD8kJRSTui0COU_cagWA:P STARTED 15070971 15070971
folders_v1:6:zCiGRjfgQ8m-vDeKipXrhg:r STARTED 15070971 15070971 ✓
folders_v1:7:oyhesxZMRPiTaW0K_cfxog:P STARTED 15098029 15098029
folders_v1:7:zCiGRjfgQ8m-vDeKipXrhg:r STARTED 15098029 15098029 ✓
folders_v1:8:oyhesxZMRPiTaW0K_cfxog:P STARTED 15089210 15089210
folders_v1:8:ufzD8kJRSTui0COU_cagWA:r STARTED 15089210 15089210 ✓
folders_v1:9:VjY5foFtRWWSY389kcG0nw:P STARTED 15067502 15067502
folders_v1:9:ufzD8kJRSTui0COU_cagWA:r STARTED 15067502 15067502 ✓
folders_v1:10:VjY5foFtRWWSY389kcG0nw:P STARTED 15086870 15086870
folders_v1:10:tpepCgDURnKPvmU6ZhFCbA:r STARTED 15086870 15086870 ✓
folders_v1:11:VjY5foFtRWWSY389kcG0nw:P STARTED 15075503 15075503
folders_v1:11:oyhesxZMRPiTaW0K_cfxog:r STARTED 15075503 15075503 ✓
folders_v1:12:VjY5foFtRWWSY389kcG0nw:P STARTED 15100723 15100723
folders_v1:12:oyhesxZMRPiTaW0K_cfxog:r STARTED 15100723 15100723 ✓
folders_v1:13:VjY5foFtRWWSY389kcG0nw:P STARTED 15069387 15069387
folders_v1:13:zCiGRjfgQ8m-vDeKipXrhg:r STARTED 15069387 15069387 ✓
folders_v1:14:oyhesxZMRPiTaW0K_cfxog:P STARTED 15061633 15061633
folders_v1:14:ufzD8kJRSTui0COU_cagWA:r STARTED 15061633 15061633 ✓
"""
import json
import sys
def parse_shards_data(index_stats: dict) -> dict:
"""
Parse and return shard primary/replica pairs grouped by shard number.
Expects a JSON document for index stats for a given index from Elasticsearch.
Will parse the shard data for all shards and group them by shard number.
"""
shards = dict()
indices = index_stats["indices"].keys()
for index_name in indices:
shard_stats = index_stats["indices"][index_name]["shards"]
for shard_num in shard_stats.keys():
for shard_struct in shard_stats[shard_num]:
# {"state": "STARTED", "primary": false,
# "node": "zCiGRjfgQ8m-vDeKipXrhg",
# "relocating_node": null}
routing = shard_struct["routing"]
node = routing["node"]
# {"max_seq_no": 89,
# "local_checkpoint": 89,
# "global_checkpoint": 89}
seq_no = shard_struct["seq_no"]
new_shard = {
"id": ":".join((index_name,
shard_num,
node,
"P" if routing["primary"] else "r")),
"index": index_name,
"routing": routing,
"seq_no": seq_no
}
if shard_num not in shards:
shards[shard_num] = list()
shards[shard_num].append(new_shard)
return shards
def find_primary_shard(shards_list):
"""
Given a list of shards as returned by `parse_shards_data()`, returns the primary
"""
for s in shards_list:
if s["routing"]["primary"]:
return s
else:
return None
def verify_shards(shards: dict) -> None:
"""
Outputs a list of shards primary/replica sets and tries to identify the
replica shards that are out of sync compared to their primary.
"""
for shard_num in shards:
primary = find_primary_shard(shards[shard_num])
allocations = shards[shard_num]
primary_max_seq_no = int(primary['seq_no']['max_seq_no'])
primary_global_checkpoint = int(primary['seq_no']['global_checkpoint'])
assert primary_max_seq_no > 0
assert primary_global_checkpoint > 0
def primary_first(allocation: dict) -> bool:
return not allocation['routing']['primary']
found_replicas = False
for a in sorted(allocations, key=primary_first):
state = a['routing']['state']
is_primary = a['routing']['primary']
max_seq_no = int(a['seq_no']['max_seq_no'])
global_checkpoint = int(a['seq_no']['global_checkpoint'])
assert max_seq_no > 0
assert global_checkpoint > 0
if is_primary:
in_sync = None
else:
in_sync = max_seq_no == primary_max_seq_no \
and global_checkpoint == primary_global_checkpoint
found_replicas = True
print(f"{a['id']}\t{state}\t{max_seq_no}\t{global_checkpoint}"
f"\t{' ' if in_sync is None else '✓' if in_sync else 'x'}")
if not found_replicas:
print(" ^ (no replicas?)")
if __name__ == "__main__":
index_stats = json.load(sys.stdin)
if "indices" not in index_stats:
raise ValueError("Trying to read index stats, but found no 'indices' key")
shards: dict = parse_shards_data(index_stats)
verify_shards(shards)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment