Last active
March 28, 2023 08:36
-
-
Save vagetablechicken/f6f0a6a6ec0a4b18b0547dff21ad3618 to your computer and use it in GitHub Desktop.
template fix to inspect offline ls
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright 2021 4Paradigm | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import argparse | |
import textwrap | |
from diagnostic_tool.connector import Connector | |
from diagnostic_tool.dist_conf import read_conf | |
from diagnostic_tool.conf_validator import ( | |
DistConfValidator, | |
ClusterConfValidator, | |
) | |
from diagnostic_tool.log_analyzer import LogAnalyzer | |
from diagnostic_tool.collector import Collector | |
import diagnostic_tool.server_checker as checker | |
from absl import app | |
from absl import flags | |
from absl.flags import argparse_flags | |
from absl import logging # --verbosity --log_dir | |
# only some sub cmd needs dist file | |
flags.DEFINE_string( | |
"conf_file", | |
"", | |
"Cluster config file, supports two styles: yaml and hosts.", | |
short_name="f", | |
) | |
flags.DEFINE_bool( | |
"local", | |
False, | |
"If set, all server in config file will be treated as local server, skip ssh.", | |
) | |
flags.DEFINE_string("collect_dir", "/tmp/diag_collect", "...") | |
def check_version(version_map: dict): | |
# cluster must have nameserver, so we use nameserver version to be the right version | |
version = version_map["nameserver"][0][1] | |
flag = True | |
for role, servers in version_map.items(): | |
for endpoint, cur_version in servers: | |
if cur_version != version: | |
logging.warning( | |
f"version mismatch. {role} {endpoint} version {cur_version} != {version}" | |
) | |
flag = False | |
return version, flag | |
def status(args): | |
"""use OpenMLDB Python SDK to connect OpenMLDB""" | |
conn = Connector() | |
status_checker = checker.StatusChecker(conn) | |
assert status_checker.check_components(), "some components is offline" | |
# --diff with dist conf file, conf_file is required | |
if args.diff: | |
assert flags.FLAGS.conf_file, "need --conf_file" | |
print( | |
"only check components in conf file, if cluster has more components, ignore them" | |
) | |
dist_conf = read_conf(flags.FLAGS.conf_file) | |
assert status_checker.check_startup( | |
dist_conf | |
), f"not all components in conf file are online, check the previous output" | |
print(f"all components in conf file are online") | |
def inspect(args): | |
insepct_online(args) | |
inspect_offline(args) | |
def insepct_online(args): | |
"""show table status""" | |
conn = Connector() | |
# scan all db include system db | |
fails = [] | |
rs = conn.execfetch("show table status like '%';") | |
rs.sort(key=lambda x: x[0]) | |
print(f"inspect {len(rs)} online tables(including system tables)") | |
for t in rs: | |
if t[13]: | |
print(f"unhealthy table {t[2]}.{t[1]}:\n {t[:13]}") | |
# sqlalchemy truncated ref https://github.com/sqlalchemy/sqlalchemy/commit/591e0cf08a798fb16e0ee9b56df5c3141aa48959 | |
# so we print warnings alone | |
print(f"full warnings:\n{t[13]}") | |
fails.append(f"{t[2]}.{t[1]}") | |
assert not fails, f"unhealthy tables: {fails}" | |
print(f"all tables are healthy") | |
def inspect_offline(args): | |
"""scan jobs status, show job log if failed""" | |
assert checker.StatusChecker(Connector()).offline_support() | |
conn = Connector() | |
jobs = conn.execfetch("SHOW JOBS") | |
# TODO some failed jobs are known, what if we want skip them? | |
print(f"inspect {len(jobs)} offline jobs") | |
fails = [] | |
# jobs sorted by id | |
jobs.sort(key=lambda x: x[0]) | |
if args.ls: | |
print("all offline jobs:") | |
for row in jobs: | |
print(" ".join([str(x) for x in row])) | |
return | |
# only FINAL_STATE "finished", "failed", "killed", "lost" | |
final_failed = ["failed", "killed", "lost"] | |
for row in jobs: | |
if row[2].lower() in final_failed: | |
fails.append(" ".join([str(x) for x in row])) | |
# DO NOT try to print rs in execfetch, it's too long | |
std_output = conn.execfetch(f"SHOW JOBLOG {row[0]}") | |
# log rs schema is FORMAT_STRING_KEY | |
assert len(std_output) == 1 and len(std_output[0]) == 1 | |
print(f"{row[0]}-{row[1]} failed, job log:\n{std_output[0][0]}") | |
fails_total = "\n".join(fails) | |
assert not fails, f"failed jobs:\n{fails_total}" | |
print("all offline final jobs are finished") | |
def test_sql(args): | |
conn = Connector() | |
status_checker = checker.StatusChecker(conn) | |
if not status_checker.check_components(): | |
logging.warning("some server is unalive, be careful") | |
tester = checker.SQLTester(conn) | |
tester.setup() | |
print("test online") | |
tester.online() | |
if status_checker.offline_support(): | |
print("test offline") | |
tester.offline() | |
else: | |
print("no taskmanager, can't test offline") | |
tester.teardown() | |
print("all test passed") | |
def static_check(args): | |
assert flags.FLAGS.conf_file, "static check needs dist conf file" | |
if not (args.version or args.conf or args.log): | |
print("at least one arg to check, check `openmldb_tool static-check -h`") | |
return | |
dist_conf = read_conf(flags.FLAGS.conf_file) | |
# the deploy path of servers may be flags.default_dir, we won't check if it's valid here. | |
assert DistConfValidator(dist_conf).validate(), "conf file is invalid" | |
collector = Collector(dist_conf) | |
if args.version: | |
versions = collector.collect_version() | |
print(f"version:\n{versions}") # TODO pretty print | |
version, ok = check_version(versions) | |
assert ok, f"all servers version should be {version}" | |
print(f"version check passed, all {version}") | |
if args.conf: | |
collector.pull_config_files(flags.FLAGS.collect_dir) | |
# config validate, read flags.FLAGS.collect_dir/<server-name>/conf | |
if dist_conf.is_cluster(): | |
assert ClusterConfValidator(dist_conf, flags.FLAGS.collect_dir).validate() | |
else: | |
assert False, "standalone unsupported" | |
if args.log: | |
collector.pull_log_files(flags.FLAGS.collect_dir) | |
# log check, read flags.FLAGS.collect_dir/logs | |
# glog parse & java log | |
LogAnalyzer(dist_conf, flags.FLAGS.collect_dir).run() | |
def parse_arg(argv): | |
"""parser definition, absl.flags + argparse""" | |
parser = argparse_flags.ArgumentParser( | |
formatter_class=argparse.RawTextHelpFormatter | |
) | |
# use args.header returned by parser.parse_args | |
subparsers = parser.add_subparsers(help="OpenMLDB Tool") | |
# sub status | |
status_parser = subparsers.add_parser( | |
"status", help="check the OpenMLDB server status" | |
) | |
status_parser.add_argument( | |
"--diff", | |
action="store_true", | |
help="check if all endpoints in conf are in cluster. If set, need to set `--conf_file`", | |
) # TODO action support in all python 3.x? | |
status_parser.set_defaults(command=status) | |
# sub inspect | |
inspect_parser = subparsers.add_parser( | |
"inspect", | |
help="Inspect online and offline. Use `inspect [online/offline]` to inspect one.", | |
) | |
# inspect online & offline | |
inspect_parser.set_defaults(command=inspect) | |
inspect_sub = inspect_parser.add_subparsers() | |
# inspect online | |
online = inspect_sub.add_parser("online", help="only inspect online table") | |
online.set_defaults(command=insepct_online) | |
# inspect offline | |
offline = inspect_sub.add_parser( | |
"offline", help="only inspect offline jobs, check the job log" | |
) | |
offline.add_argument("-ls", action="store_true", help="list all offline jobs in order") | |
offline.set_defaults(command=inspect_offline) | |
# sub test | |
test_parser = subparsers.add_parser( | |
"test", | |
help="Do simple create&insert&select test in online, select in offline(if taskmanager exists)", | |
) | |
test_parser.set_defaults(command=test_sql) | |
# sub static-check | |
static_check_parser = subparsers.add_parser( | |
"static-check", | |
help=textwrap.dedent( | |
""" \ | |
Static check on remote host, version/conf/log, -h to show the arguments, --conf_file is required. | |
Use -VCL to check all. | |
You can check version or config before cluster running. | |
If servers are remote, need Passwordless SSH Login. | |
""" | |
), | |
) | |
static_check_parser.add_argument( | |
"--version", "-V", action="store_true", help="check version" | |
) | |
static_check_parser.add_argument( | |
"--conf", "-C", action="store_true", help="check conf" | |
) | |
static_check_parser.add_argument( | |
"--log", "-L", action="store_true", help="check log" | |
) | |
static_check_parser.set_defaults(command=static_check) | |
def help(args): | |
parser.print_help() | |
parser.set_defaults(command=help) | |
args = parser.parse_args(argv[1:]) | |
tool_flags = { | |
k: [flag.serialize() for flag in v] | |
for k, v in flags.FLAGS.flags_by_module_dict().items() | |
if "diagnostic_tool" in k | |
} | |
logging.debug(f"args:{args}, flags: {tool_flags}") | |
return args | |
def main(args): | |
# TODO: adjust args, e.g. if conf_file, we can get zk addr from conf file, no need to --cluster | |
# run the command | |
print(f"diagnosing cluster {Connector().address()}") | |
args.command(args) | |
def run(): | |
app.run(main, flags_parser=parse_arg) | |
if __name__ == "__main__": | |
app.run(main, flags_parser=parse_arg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment