Skip to content

Instantly share code, notes, and snippets.

@h0hmj
Last active December 18, 2023 08:42
Show Gist options
  • Save h0hmj/c654515cc822735668dbfa2f6a848127 to your computer and use it in GitHub Desktop.
Save h0hmj/c654515cc822735668dbfa2f6a848127 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import sqlite3 # if you use rqlite, you need to change db connection things
import yaml
import os
import logging
DBPATH = "/root/.curveadm/data/curveadm.db"
CLUSTER_NAME = "test"
IMAGE = "curveadm-harbor.org:8888/curve/curvebs:v1.2.7-rc3_47a5267"
DATA_DIR_PREFIX = "/test/data"
LOG_DIR_PREFIX = "/test/logs"
class DBHandler:
# it is a lib to operation curveadm db
# related db schema:
# sqlite> .schema clusters
# CREATE TABLE clusters (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# uuid TEXT NOT NULL,
# name TEXT NOT NULL UNIQUE,
# description TEXT,
# topology TEXT NULL,
# pool TEXT NULL,
# create_time DATE NOT NULL,
# current INTEGER DEFAULT 0
# );
# sqlite> .schema containers
# CREATE TABLE containers (
# id TEXT PRIMARY KEY,
# cluster_id INTEGER NOT NULL,
# container_id TEXT NOT NULL
# );
# sqlite> .schema hosts
# CREATE TABLE hosts (
# id INTEGER PRIMARY KEY AUTOINCREMENT,
# data TEXT NOT NULL,
# lastmodified_time DATE NOT NULL
# );
def __init__(self, db_path, db_type="sqlite3"):
self.db_path = db_path
if db_type == "sqlite3":
self.db_connection = sqlite3.connect(self.db_path)
self.db_cursor = self.db_connection.cursor()
else:
raise Exception("db type not support")
def execute(self, sql):
logging.debug(f"execute sql: {sql}")
return self.db_cursor.execute(sql)
def commit(self):
self.db_connection.commit()
class CurveadmCluster:
def __init__(self, db_path, name):
self.name = name
self.topo = None # yaml
self.id = None
self.uuid = None
self.hosts = None # yaml
self.db_handler = DBHandler(db_path)
# load cluster info from db
result = self.db_handler.execute(
f"SELECT id,uuid,topology FROM clusters WHERE name='{self.name}'"
).fetchone()
if result is None:
raise Exception("cluster not exist")
else:
self.id, self.uuid, topo_str = result
topo_yaml = yaml.safe_load(topo_str)
# processing variables
if "variable" in topo_yaml["global"]:
variables = topo_yaml["global"]["variable"]
for key in variables:
# replace ${key} to value
topo_str = topo_str.replace(f"${{{key}}}", variables[key])
self.topo = yaml.safe_load(topo_str)
def __generate_service_id(
self, uuid, service_name, host, host_index, instance_index
):
# host_alias: host in curveadm hosts show, not hostname
# host_index: index of deploy host in the cluster
# instance_index: index of instance in one host
# service_id is the used in curveadm db containers table
import hashlib
m = hashlib.md5()
s = f"{uuid}_{service_name}_{host}_{host_index}_{instance_index}"
m.update(s.encode("utf-8"))
return str(m.hexdigest())
def __add_container(self, service_id, cluster_id, container_id="-"):
self.db_handler.execute(
f'INSERT INTO containers (id, cluster_id, container_id) VALUES ("{service_id[:12]}", {cluster_id}, "{container_id}") ON CONFLICT (id) DO NOTHING'
)
self.db_handler.commit()
def __remove_container(self, service_id):
self.db_handler.execute(f'DELETE FROM containers WHERE id="{service_id[:12]}"')
self.db_handler.commit()
def __update_container(self, service_id, container_id):
self.db_handler.execute(
f'UPDATE containers SET container_id="{container_id}" WHERE id="{service_id[:12]}"'
)
self.db_handler.commit()
def __update_topo(self):
self.db_handler.execute(
f"UPDATE clusters SET topology = '{yaml.safe_dump(self.topo)}' WHERE name = '{self.name}'"
)
self.db_handler.commit()
def add_service(self, service_name, host_alias, instances_num, container_ids=None):
if container_ids is not None and len(container_ids) != instances_num:
raise Exception("container_ids num not match instances_num")
services = self.topo[f"{service_name}_services"]
deploys = services["deploy"]
host_index = len(deploys)
for deploy in deploys:
host = deploy["host"]
if host_alias == host:
return
deploys.append({"host": host_alias, "instances": instances_num})
# 1. update containers table
for i in range(instances_num):
service_id = self.__generate_service_id(
self.uuid, service_name, host_alias, host_index, i
)
if container_ids is None:
self.__add_container(service_id, self.id)
else:
self.__add_container(service_id, self.id, container_ids[i])
# 2. update clusters table
self.__update_topo()
return host_index
def update_container_id(self, service_id, container_id):
self.__update_container(service_id, container_id)
def remove_service(self, service_name, host_alias):
host_index = self.get_host_index(service_name, host_alias)
instances_num = self.get_instances_num(service_name, host_alias)
services = self.topo[f"{service_name}_services"]
deploys = services["deploy"]
# 1. update containers table
for i in range(instances_num):
service_id = self.__generate_service_id(
self.uuid, service_name, host_alias, host_index, i
)
self.__remove_container(service_id)
for i in range(host_index + 1, len(deploys)):
for j in range(instances_num):
old_service_id = self.__generate_service_id(
self.uuid, service_name, deploys[i]["host"], i, j
)
new_service_id = self.__generate_service_id(
self.uuid, service_name, deploys[i]["host"], i - 1, j
)
container_id = self.get_container_id(old_service_id)
# the primary key changed, need delete old one and insert new one
self.__remove_container(old_service_id)
self.__add_container(new_service_id, self.id, container_id)
# 2. update cluster table
deploys.pop(host_index)
self.__update_topo()
def generate_service_ids(self, service_name, host_alias, host_index, instances_num):
service_ids = []
for i in range(instances_num):
service_id = self.__generate_service_id(
self.uuid, service_name, host_alias, host_index, i
)
service_ids.append(service_id)
return service_ids
def get_container_id(self, service_id):
result = self.db_handler.execute(
f'SELECT container_id FROM containers WHERE id="{service_id[:12]}"'
).fetchone()
if result is None:
return None
else:
return result[0]
def set_topo(self, path):
with open(path, "r") as f:
self.topo = yaml.safe_load(f.read())
self.__update_topo()
def get_host_index(self, service_name, host_alias):
services = self.topo[f"{service_name}_services"]
deploys = services["deploy"]
for i in range(len(deploys)):
if deploys[i]["host"] == host_alias:
return i
return None
def get_instances_num(self, service_name, host_alias):
# fixme
services = self.topo[f"{service_name}_services"]
deploys = services["deploy"]
for deploy in deploys:
if deploy["host"] == host_alias:
if "instances" in deploy:
return deploy["instances"]
else:
return 1
return None
def get_etcd_ip_port(self):
services = self.topo["etcd_services"]
config = services["config"]
return config["listen.ip"], config["listen.port"], config["listen.client_port"]
class ServiceManager:
def __init__(self, image):
self.image = image
def __get_container_id(self, host, container_name):
cmd = f"ssh {host} sudo docker ps -a --no-trunc | grep {container_name} | awk '{{print $1}}'"
return os.popen(cmd).read().strip()
def create_mds_container(self, host, service_id, data_dir, log_dir):
cmd = (
f"ssh {host} sudo docker create --name=curvefs-mds-{service_id} --hostname=curvefs-mds-{service_id} "
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvefs/mds/logs "
f"--volume={data_dir}:/curvefs/mds/data --network=host --privileged --restart=always "
f"--add-host curvefs-mds-{service_id}:127.0.0.1 --runtime=runc {self.image} --role mds --args="
)
os.system(cmd)
return self.__get_container_id(host, f"curvefs-mds-{service_id}")
def create_etcd_container(self, host, service_id, data_dir, log_dir):
cmd = (
f"ssh {host} sudo docker create --name=curvefs-etcd-{service_id} --hostname=curvefs-etcd-{service_id} "
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvefs/etcd/logs "
f"--volume={data_dir}:/curvefs/etcd/data --network=host --privileged --restart=always "
f"--add-host curvefs-etcd-{service_id}:127.0.0.1 --runtime=runc {self.image} --role etcd --args="
)
os.system(cmd)
return self.__get_container_id(host, f"curvefs-etcd-{service_id}")
def create_metaserver_container(self, host, service_id, data_dir, log_dir):
cmd = (
f"ssh {host} sudo docker create --name=curvefs-metaserver-{service_id} --hostname=curvefs-metaserver-{service_id} "
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvefs/metaserver/logs "
f"--volume={data_dir}:/curvefs/metaserver/data --network=host --privileged --restart=no "
f"--add-host curvefs-metaserver-{service_id}:127.0.0.1 --runtime=runc {self.image} --role metaserver --args="
)
os.system(cmd)
return self.__get_container_id(host, f"curvefs-metaserver-{service_id}")
def create_snapshotclone_container(self, host, service_id, data_dir, log_dir):
cmd = (
f"ssh {host} sudo docker create --name=curvebs-snapshotclone-{service_id} --hostname=curvebs-snapshotclone-{service_id} "
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvebs/snapshotclone/logs "
f"--volume={data_dir}:/curvebs/snapshotclone/data --network=host --privileged --restart=no "
f"--add-host curvebs-snapshotclone-{service_id}:127.0.0.1 --runtime=runc {self.image} --role snapshotclone --args="
)
os.system(cmd)
return self.__get_container_id(host, f"curvebs-snapshotclone-{service_id}")
def remove_container(self, host, container_id):
# rm only stopped container
cmd = f"ssh {host} sudo docker rm {container_id}"
os.system(cmd)
def set_topo(path):
# for testing
cluster = CurveadmCluster(DBPATH, CLUSTER_NAME)
cluster.set_topo(path)
def get_etcd_init_conf(name, ip, peer_port, client_port, initial_cluster):
print(
f"name: {name}\n"
"data-dir: /curvefs/etcd/data\n"
"wal-dir: /curvefs/etcd/data/wal\n"
f"listen-peer-urls: http://{ip}:{peer_port}\n"
f"listen-client-urls: http://{ip}:{client_port}\n"
f"initial-advertise-peer-urls: http://{ip}:{peer_port}\n"
f"advertise-client-urls: http://{ip}:{client_port}\n"
f"initial-cluster: {initial_cluster}\n"
f"initial-cluster-state: existing"
)
def add_service(service_name, host_alias, instances_num):
cluster = CurveadmCluster(DBPATH, CLUSTER_NAME)
service_manager = ServiceManager(IMAGE)
if service_name not in [
"mds",
"etcd",
"metaserver",
"chunkserver",
"snapshotclone",
]:
raise Exception("service name not support")
# 1. add service
host_index = cluster.add_service(service_name, host_alias, instances_num)
service_ids = cluster.generate_service_ids(
service_name, host_alias, host_index, instances_num
)
# 2. create containers
for i in range(instances_num):
if service_name == "mds":
instance = service_manager.create_mds_container(
host_alias,
service_ids[i][:12],
f"{DATA_DIR_PREFIX}/mds{i}",
f"{LOG_DIR_PREFIX}/mds{i}",
)
elif service_name == "etcd":
instance = service_manager.create_etcd_container(
host_alias,
service_ids[i][:12],
f"{DATA_DIR_PREFIX}/etcd{i}",
f"{LOG_DIR_PREFIX}/etcd{i}",
)
elif service_name == "metaserver":
instance = service_manager.create_metaserver_container(
host_alias,
service_ids[i][:12],
f"{DATA_DIR_PREFIX}/metaserver{i}",
f"{LOG_DIR_PREFIX}/metaserver{i}",
)
elif service_name == "snapshotclone":
instance = service_manager.create_snapshotclone_container(
host_alias,
service_ids[i][:12],
f"{DATA_DIR_PREFIX}/snapshotclone{i}",
f"{LOG_DIR_PREFIX}/snapshotclone{i}",
)
else:
break
cluster.update_container_id(service_ids[i], instance)
def rm_service(service_name, host_alias):
cluster = CurveadmCluster(DBPATH, CLUSTER_NAME)
service_manager = ServiceManager(IMAGE)
if service_name not in [
"mds",
"etcd",
"metaserver",
"chunkserver",
"snapshotclone",
]:
raise Exception("service name not support")
# 1. remove containers
host_index = cluster.get_host_index(service_name, host_alias)
instances_num = cluster.get_instances_num(service_name, host_alias)
service_ids = cluster.generate_service_ids(
service_name, host_alias, host_index, instances_num
)
for i in range(instances_num):
instance = cluster.get_container_id(service_ids[i][:12])
service_manager.remove_container(host_alias, instance)
# 2. remove service
cluster.remove_service(service_name, host_alias)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="subparser_name")
parser_service = subparsers.add_parser("service")
parser_service.add_argument("action", choices=["add", "rm"])
parser_service.add_argument(
"--service-name",
choices=["mds", "etcd", "metaserver", "chunkserver", "snapshotclone"],
)
parser_service.add_argument("--host-alias")
parser_service.add_argument("--instances-num", type=int)
parser_etcd = subparsers.add_parser("etcd")
parser_etcd.add_argument("action", choices=["get-init-conf"])
parser_etcd.add_argument("--name")
parser_etcd.add_argument("--ip")
parser_etcd.add_argument("--peer-port")
parser_etcd.add_argument("--client-port")
parser_etcd.add_argument("--initial-cluster")
args = parser.parse_args()
if args.subparser_name == "service":
if args.action == "add":
add_service(args.service_name, args.host_alias, args.instances_num)
elif args.action == "rm":
rm_service(args.service_name, args.host_alias)
elif args.subparser_name == "etcd":
if args.action == "get-init-conf":
get_etcd_init_conf(
args.name,
args.ip,
args.peer_port,
args.client_port,
args.initial_cluster,
)
else:
parser.print_help()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment