Skip to content

Instantly share code, notes, and snippets.

@loganlinn
Created November 11, 2020 11:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loganlinn/72315449d13d94d6fde71fb7643db55c to your computer and use it in GitHub Desktop.
Save loganlinn/72315449d13d94d6fde71fb7643db55c to your computer and use it in GitHub Desktop.
import argparse
import logging
import os
import sys
from inspect import cleandoc
from collections import namedtuple
import clickhouse_driver
assert sys.version_info > (3,)
logger = logging.getLogger(sys.modules["__main__"].__file__)
def expand_macro(client, macro):
return client.execute("SELECT getMacro(%(macro)s)", {"macro": macro})[0][0]
def get_cluster_hosts(client, cluster):
"""Returns list of fully-qualified domain names for each host in the named cluster"""
return [
row[0]
for row in client.execute(
"SELECT fqdn() FROM cluster(%(cluster)s, system.one)", {"cluster": cluster},
)
]
def get_create_distributed_objects(client, hosts):
"""
Returns
names: list of datbase/table names
create_commands: a list of 'CREATE` commands"""
cluster_tables = "remote('{}', system, tables)".format(",".join(hosts))
create_db_query = """
SELECT
DISTINCT database AS name,
concat('CREATE DATABASE IF NOT EXISTS "', name, '"') AS create_query
FROM
(
SELECT DISTINCT arrayJoin([database, extract(engine_full, 'Distributed\\\\([^,]+, *\\\'?([^,\\\']+)\\\'?, *[^,]+')]) database
FROM {cluster_tables} tables
WHERE engine = 'Distributed'
SETTINGS skip_unavailable_shards = 1
)""".format(
cluster_tables=cluster_tables
)
create_tables_query = """
SELECT DISTINCT
concat(database, '.', name) AS name,
replaceRegexpOne(create_table_query, 'CREATE (TABLE|VIEW|MATERIALIZED VIEW)', 'CREATE \\\\1 IF NOT EXISTS')
FROM
(
SELECT
database,
name,
create_table_query,
2 AS order
FROM {cluster_tables} tables
WHERE engine = 'Distributed'
SETTINGS skip_unavailable_shards = 1
UNION ALL
SELECT
extract(engine_full, 'Distributed\\\\([^,]+, *\\\'?([^,\\\']+)\\\'?, *[^,]+') AS database,
extract(engine_full, 'Distributed\\\\([^,]+, [^,]+, *\\\'?([^,\\\\\\\')]+)') AS name,
t.create_table_query,
1 AS order
FROM {cluster_tables} tables
LEFT JOIN (
SELECT DISTINCT database, name, create_table_query
FROM {cluster_tables}
SETTINGS skip_unavailable_shards = 1
) t USING (database, name)
WHERE engine = 'Distributed' AND t.create_table_query != ''
SETTINGS skip_unavailable_shards = 1
) tables
ORDER BY order
""".format(
cluster_tables=cluster_tables
)
db_names, create_dbs = client.execute(create_db_query, columnar=True)
table_names, create_tables = client.execute(create_tables_query, columnar=True)
return DistributedObjects(db_names + table_names, create_dbs + create_tables)
DistributedObjects = namedtuple("DistributedObjects", ["names", "create_queries"])
def get_client(host_or_url):
if str(host_or_url).startswith("clickhouse://"):
return clickhouse_driver.Client.from_url(host_or_url)
else:
return clickhouse_driver.Client(host=str(host_or_url))
def main():
args = parse_args()
setup_logging(args.verbosity)
host = args.hosts[0]
client = get_client(host)
cluster_name = args.cluster or expand_macro(client, "cluster")
cluster_hosts = get_cluster_hosts(client, cluster_name)
names, create_queries = get_create_distributed_objects(client, cluster_hosts)
logger.info("Executing CREATE commands for: %s", names)
for host in cluster_hosts:
c = get_client(host)
for query in create_queries:
# TODO retry attempts
if args.execute:
c.execute(query)
else:
logger.debug("SKIP: host=%s query=%s", host, query)
return 0
def parse_args():
parser = argparse.ArgumentParser(description="Manage schema of ClickHouse cluster.")
# parser.add_argument("--dsn", help="ClickHouse DSN, like clickhouse://[user:password]@localhost:9000/default")
parser.add_argument("--execute", action="store_true")
parser.add_argument("--cluster")
parser.add_argument("hosts", nargs="+")
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
dest="verbosity",
help="Verbosity (-v, -vv, etc)",
)
parser.add_argument(
"-q",
"--quiet",
action="store_const",
const=-1,
default=0,
dest="verbosity",
help="quiet output (show errors only)",
)
return parser.parse_args()
def setup_logging(verbosity):
base_loglevel = getattr(logging, (os.getenv("LOGLEVEL", "WARNING")).upper())
verbosity = min(verbosity, 2)
loglevel = base_loglevel - (verbosity * 10)
logging.basicConfig(level=loglevel, format="[%(levelname)s] %(message)s")
if __name__ == "__main__":
exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment