Last active
July 2, 2018 16:15
-
-
Save seut/988a74c98fe9d44f33e228f4a5d500de to your computer and use it in GitHub Desktop.
crate health check script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
from crate.client.connection import Connection | |
from crate.client.cursor import Cursor | |
from crate.client.exceptions import ProgrammingError | |
STMT_HEALTH = "select table_name, schema_name, partition_ident, routing_state," \ | |
"\"primary\", relocating_node, count(*) from sys.shards" \ | |
" group by 1, 2, 3, 4, 5, 6" | |
STMT_TABLE_CONFIG = "select table_name, table_schema, number_of_shards from information_schema.tables" \ | |
" where table_schema not in('sys', 'information_schema', 'pg_catalog') and partitioned_by is null" | |
STMT_PARTITION_CONFIG = "select table_name, schema_name, partition_ident, number_of_shards" \ | |
" from information_schema.table_partitions" \ | |
" where schema_name not in('sys', 'information_schema', 'pg_catalog')" | |
class ShardsInfo(object): | |
activePrimaries = 0 | |
primaries = 0 | |
unassigned = 0 | |
replicating = 0 | |
class TablePartitionIdent(object): | |
def __init__(self, table_name: str, table_schema: str, partition_ident: str): | |
self.table_name = table_name | |
self.table_schema = table_schema | |
self.partition_ident = partition_ident | |
def __eq__(self, o: object) -> bool: | |
if isinstance(o, TablePartitionIdent) is False: | |
return False | |
return self.table_name == o.table_name and self.table_schema == o.table_schema \ | |
and self.partition_ident == o.partition_ident | |
def __hash__(self) -> int: | |
hash_str = f"{self.table_name}{self.table_schema}" | |
if self.partition_ident is not None: | |
hash_str = f"{hash_str}{self.partition_ident}" | |
return hash(hash_str) | |
def is_active_shard(routing_state: str) -> bool: | |
return routing_state.upper() == "STARTED" or routing_state.upper() == "RELOCATING" | |
def collect_shard_info(shards_info: ShardsInfo, | |
routing_state: str, | |
primary: str, | |
shard_count: str, | |
relocating_node: str) -> None: | |
if primary is True: | |
shards_info.primaries += shard_count | |
if is_active_shard(routing_state) is True and primary is True: | |
shards_info.activePrimaries += shard_count | |
elif routing_state.upper() == "UNASSIGNED": | |
shards_info.unassigned += shard_count | |
elif routing_state.upper() == "INITIALIZING" and relocating_node is None: | |
shards_info.replicating += shard_count | |
def execute(cursor: Cursor, statement: str) -> bool: | |
try: | |
cursor.execute(statement) | |
return True | |
except ConnectionError: | |
print('Use \\connect <server> to connect to one or more servers first.', file=sys.stderr) | |
except ProgrammingError as e: | |
print(e.message, file=sys.stderr) | |
return False | |
def process_shards_result(result: list) -> dict: | |
tables = dict() | |
for r in result: | |
ident = TablePartitionIdent(r[0], r[1], r[2]) | |
shards_info = tables.get(ident, ShardsInfo()) | |
routing_state = r[3] | |
primary = r[4] | |
relocating_node = r[5] | |
cnt = r[6] | |
collect_shard_info(shards_info, routing_state, primary, cnt, relocating_node) | |
tables[ident] = shards_info | |
return tables | |
def process_tables_result(result: list, tables: dict) -> None: | |
for r in result: | |
ident = TablePartitionIdent(r[0], r[1], '') | |
tables[ident] = r[2] | |
def process_partitions_result(result: list, tables: dict) -> None: | |
for r in result: | |
ident = TablePartitionIdent(r[0], r[1], r[2]) | |
tables[ident] = r[3] | |
def print_tables_health(tables: dict, tables_config: dict) -> None: | |
print("table_name; table_schema; partition_ident; health; missing_shards; underreplicated_shards") | |
for ident, shards_info in tables.items(): | |
configured_shards = tables_config.get(ident) | |
missing = max(0, configured_shards - shards_info.activePrimaries) | |
underreplicated = max(0, shards_info.unassigned + shards_info.replicating - missing) | |
health = "GREEN" | |
if missing > 0: | |
health = "RED" | |
elif underreplicated > 0: | |
health = "YELLOW" | |
print(f"{ident.table_name}; {ident.table_schema}; {ident.partition_ident};" | |
f" {health}; {missing}; {underreplicated}") | |
def main(): | |
servers = None | |
if len(sys.argv) > 1: | |
servers = sys.argv[1] | |
connection = Connection(servers) | |
cursor = connection.cursor() | |
tables_config = dict() | |
execute(cursor, STMT_TABLE_CONFIG) | |
process_tables_result(cursor.fetchall(), tables_config) | |
execute(cursor, STMT_PARTITION_CONFIG) | |
process_partitions_result(cursor.fetchall(), tables_config) | |
execute(cursor, STMT_HEALTH) | |
print_tables_health(process_shards_result(cursor.fetchall()), tables_config) | |
cursor.close() | |
connection.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment