Skip to content

Instantly share code, notes, and snippets.

@seut
Last active July 2, 2018 16:15
Show Gist options
  • Save seut/988a74c98fe9d44f33e228f4a5d500de to your computer and use it in GitHub Desktop.
Save seut/988a74c98fe9d44f33e228f4a5d500de to your computer and use it in GitHub Desktop.
crate health check script
#!/usr/bin/env python
import sys
from crate.client.connection import Connection
from crate.client.cursor import Cursor
from crate.client.exceptions import ProgrammingError
STMT_HEALTH = "select table_name, schema_name, partition_ident, routing_state," \
"\"primary\", relocating_node, count(*) from sys.shards" \
" group by 1, 2, 3, 4, 5, 6"
STMT_TABLE_CONFIG = "select table_name, table_schema, number_of_shards from information_schema.tables" \
" where table_schema not in('sys', 'information_schema', 'pg_catalog') and partitioned_by is null"
STMT_PARTITION_CONFIG = "select table_name, schema_name, partition_ident, number_of_shards" \
" from information_schema.table_partitions" \
" where schema_name not in('sys', 'information_schema', 'pg_catalog')"
class ShardsInfo(object):
activePrimaries = 0
primaries = 0
unassigned = 0
replicating = 0
class TablePartitionIdent(object):
def __init__(self, table_name: str, table_schema: str, partition_ident: str):
self.table_name = table_name
self.table_schema = table_schema
self.partition_ident = partition_ident
def __eq__(self, o: object) -> bool:
if isinstance(o, TablePartitionIdent) is False:
return False
return self.table_name == o.table_name and self.table_schema == o.table_schema \
and self.partition_ident == o.partition_ident
def __hash__(self) -> int:
hash_str = f"{self.table_name}{self.table_schema}"
if self.partition_ident is not None:
hash_str = f"{hash_str}{self.partition_ident}"
return hash(hash_str)
def is_active_shard(routing_state: str) -> bool:
return routing_state.upper() == "STARTED" or routing_state.upper() == "RELOCATING"
def collect_shard_info(shards_info: ShardsInfo,
routing_state: str,
primary: str,
shard_count: str,
relocating_node: str) -> None:
if primary is True:
shards_info.primaries += shard_count
if is_active_shard(routing_state) is True and primary is True:
shards_info.activePrimaries += shard_count
elif routing_state.upper() == "UNASSIGNED":
shards_info.unassigned += shard_count
elif routing_state.upper() == "INITIALIZING" and relocating_node is None:
shards_info.replicating += shard_count
def execute(cursor: Cursor, statement: str) -> bool:
try:
cursor.execute(statement)
return True
except ConnectionError:
print('Use \\connect <server> to connect to one or more servers first.', file=sys.stderr)
except ProgrammingError as e:
print(e.message, file=sys.stderr)
return False
def process_shards_result(result: list) -> dict:
tables = dict()
for r in result:
ident = TablePartitionIdent(r[0], r[1], r[2])
shards_info = tables.get(ident, ShardsInfo())
routing_state = r[3]
primary = r[4]
relocating_node = r[5]
cnt = r[6]
collect_shard_info(shards_info, routing_state, primary, cnt, relocating_node)
tables[ident] = shards_info
return tables
def process_tables_result(result: list, tables: dict) -> None:
for r in result:
ident = TablePartitionIdent(r[0], r[1], '')
tables[ident] = r[2]
def process_partitions_result(result: list, tables: dict) -> None:
for r in result:
ident = TablePartitionIdent(r[0], r[1], r[2])
tables[ident] = r[3]
def print_tables_health(tables: dict, tables_config: dict) -> None:
print("table_name; table_schema; partition_ident; health; missing_shards; underreplicated_shards")
for ident, shards_info in tables.items():
configured_shards = tables_config.get(ident)
missing = max(0, configured_shards - shards_info.activePrimaries)
underreplicated = max(0, shards_info.unassigned + shards_info.replicating - missing)
health = "GREEN"
if missing > 0:
health = "RED"
elif underreplicated > 0:
health = "YELLOW"
print(f"{ident.table_name}; {ident.table_schema}; {ident.partition_ident};"
f" {health}; {missing}; {underreplicated}")
def main():
servers = None
if len(sys.argv) > 1:
servers = sys.argv[1]
connection = Connection(servers)
cursor = connection.cursor()
tables_config = dict()
execute(cursor, STMT_TABLE_CONFIG)
process_tables_result(cursor.fetchall(), tables_config)
execute(cursor, STMT_PARTITION_CONFIG)
process_partitions_result(cursor.fetchall(), tables_config)
execute(cursor, STMT_HEALTH)
print_tables_health(process_shards_result(cursor.fetchall()), tables_config)
cursor.close()
connection.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment