Last active
December 18, 2023 08:42
-
-
Save h0hmj/c654515cc822735668dbfa2f6a848127 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import sqlite3 # if you use rqlite, you need to change db connection things | |
import yaml | |
import os | |
import logging | |
DBPATH = "/root/.curveadm/data/curveadm.db" | |
CLUSTER_NAME = "test" | |
IMAGE = "curveadm-harbor.org:8888/curve/curvebs:v1.2.7-rc3_47a5267" | |
DATA_DIR_PREFIX = "/test/data" | |
LOG_DIR_PREFIX = "/test/logs" | |
class DBHandler: | |
# it is a lib to operation curveadm db | |
# related db schema: | |
# sqlite> .schema clusters | |
# CREATE TABLE clusters ( | |
# id INTEGER PRIMARY KEY AUTOINCREMENT, | |
# uuid TEXT NOT NULL, | |
# name TEXT NOT NULL UNIQUE, | |
# description TEXT, | |
# topology TEXT NULL, | |
# pool TEXT NULL, | |
# create_time DATE NOT NULL, | |
# current INTEGER DEFAULT 0 | |
# ); | |
# sqlite> .schema containers | |
# CREATE TABLE containers ( | |
# id TEXT PRIMARY KEY, | |
# cluster_id INTEGER NOT NULL, | |
# container_id TEXT NOT NULL | |
# ); | |
# sqlite> .schema hosts | |
# CREATE TABLE hosts ( | |
# id INTEGER PRIMARY KEY AUTOINCREMENT, | |
# data TEXT NOT NULL, | |
# lastmodified_time DATE NOT NULL | |
# ); | |
def __init__(self, db_path, db_type="sqlite3"): | |
self.db_path = db_path | |
if db_type == "sqlite3": | |
self.db_connection = sqlite3.connect(self.db_path) | |
self.db_cursor = self.db_connection.cursor() | |
else: | |
raise Exception("db type not support") | |
def execute(self, sql): | |
logging.debug(f"execute sql: {sql}") | |
return self.db_cursor.execute(sql) | |
def commit(self): | |
self.db_connection.commit() | |
class CurveadmCluster: | |
def __init__(self, db_path, name): | |
self.name = name | |
self.topo = None # yaml | |
self.id = None | |
self.uuid = None | |
self.hosts = None # yaml | |
self.db_handler = DBHandler(db_path) | |
# load cluster info from db | |
result = self.db_handler.execute( | |
f"SELECT id,uuid,topology FROM clusters WHERE name='{self.name}'" | |
).fetchone() | |
if result is None: | |
raise Exception("cluster not exist") | |
else: | |
self.id, self.uuid, topo_str = result | |
topo_yaml = yaml.safe_load(topo_str) | |
# processing variables | |
if "variable" in topo_yaml["global"]: | |
variables = topo_yaml["global"]["variable"] | |
for key in variables: | |
# replace ${key} to value | |
topo_str = topo_str.replace(f"${{{key}}}", variables[key]) | |
self.topo = yaml.safe_load(topo_str) | |
def __generate_service_id( | |
self, uuid, service_name, host, host_index, instance_index | |
): | |
# host_alias: host in curveadm hosts show, not hostname | |
# host_index: index of deploy host in the cluster | |
# instance_index: index of instance in one host | |
# service_id is the used in curveadm db containers table | |
import hashlib | |
m = hashlib.md5() | |
s = f"{uuid}_{service_name}_{host}_{host_index}_{instance_index}" | |
m.update(s.encode("utf-8")) | |
return str(m.hexdigest()) | |
def __add_container(self, service_id, cluster_id, container_id="-"): | |
self.db_handler.execute( | |
f'INSERT INTO containers (id, cluster_id, container_id) VALUES ("{service_id[:12]}", {cluster_id}, "{container_id}") ON CONFLICT (id) DO NOTHING' | |
) | |
self.db_handler.commit() | |
def __remove_container(self, service_id): | |
self.db_handler.execute(f'DELETE FROM containers WHERE id="{service_id[:12]}"') | |
self.db_handler.commit() | |
def __update_container(self, service_id, container_id): | |
self.db_handler.execute( | |
f'UPDATE containers SET container_id="{container_id}" WHERE id="{service_id[:12]}"' | |
) | |
self.db_handler.commit() | |
def __update_topo(self): | |
self.db_handler.execute( | |
f"UPDATE clusters SET topology = '{yaml.safe_dump(self.topo)}' WHERE name = '{self.name}'" | |
) | |
self.db_handler.commit() | |
def add_service(self, service_name, host_alias, instances_num, container_ids=None): | |
if container_ids is not None and len(container_ids) != instances_num: | |
raise Exception("container_ids num not match instances_num") | |
services = self.topo[f"{service_name}_services"] | |
deploys = services["deploy"] | |
host_index = len(deploys) | |
for deploy in deploys: | |
host = deploy["host"] | |
if host_alias == host: | |
return | |
deploys.append({"host": host_alias, "instances": instances_num}) | |
# 1. update containers table | |
for i in range(instances_num): | |
service_id = self.__generate_service_id( | |
self.uuid, service_name, host_alias, host_index, i | |
) | |
if container_ids is None: | |
self.__add_container(service_id, self.id) | |
else: | |
self.__add_container(service_id, self.id, container_ids[i]) | |
# 2. update clusters table | |
self.__update_topo() | |
return host_index | |
def update_container_id(self, service_id, container_id): | |
self.__update_container(service_id, container_id) | |
def remove_service(self, service_name, host_alias): | |
host_index = self.get_host_index(service_name, host_alias) | |
instances_num = self.get_instances_num(service_name, host_alias) | |
services = self.topo[f"{service_name}_services"] | |
deploys = services["deploy"] | |
# 1. update containers table | |
for i in range(instances_num): | |
service_id = self.__generate_service_id( | |
self.uuid, service_name, host_alias, host_index, i | |
) | |
self.__remove_container(service_id) | |
for i in range(host_index + 1, len(deploys)): | |
for j in range(instances_num): | |
old_service_id = self.__generate_service_id( | |
self.uuid, service_name, deploys[i]["host"], i, j | |
) | |
new_service_id = self.__generate_service_id( | |
self.uuid, service_name, deploys[i]["host"], i - 1, j | |
) | |
container_id = self.get_container_id(old_service_id) | |
# the primary key changed, need delete old one and insert new one | |
self.__remove_container(old_service_id) | |
self.__add_container(new_service_id, self.id, container_id) | |
# 2. update cluster table | |
deploys.pop(host_index) | |
self.__update_topo() | |
def generate_service_ids(self, service_name, host_alias, host_index, instances_num): | |
service_ids = [] | |
for i in range(instances_num): | |
service_id = self.__generate_service_id( | |
self.uuid, service_name, host_alias, host_index, i | |
) | |
service_ids.append(service_id) | |
return service_ids | |
def get_container_id(self, service_id): | |
result = self.db_handler.execute( | |
f'SELECT container_id FROM containers WHERE id="{service_id[:12]}"' | |
).fetchone() | |
if result is None: | |
return None | |
else: | |
return result[0] | |
def set_topo(self, path): | |
with open(path, "r") as f: | |
self.topo = yaml.safe_load(f.read()) | |
self.__update_topo() | |
def get_host_index(self, service_name, host_alias): | |
services = self.topo[f"{service_name}_services"] | |
deploys = services["deploy"] | |
for i in range(len(deploys)): | |
if deploys[i]["host"] == host_alias: | |
return i | |
return None | |
def get_instances_num(self, service_name, host_alias): | |
# fixme | |
services = self.topo[f"{service_name}_services"] | |
deploys = services["deploy"] | |
for deploy in deploys: | |
if deploy["host"] == host_alias: | |
if "instances" in deploy: | |
return deploy["instances"] | |
else: | |
return 1 | |
return None | |
def get_etcd_ip_port(self): | |
services = self.topo["etcd_services"] | |
config = services["config"] | |
return config["listen.ip"], config["listen.port"], config["listen.client_port"] | |
class ServiceManager: | |
def __init__(self, image): | |
self.image = image | |
def __get_container_id(self, host, container_name): | |
cmd = f"ssh {host} sudo docker ps -a --no-trunc | grep {container_name} | awk '{{print $1}}'" | |
return os.popen(cmd).read().strip() | |
def create_mds_container(self, host, service_id, data_dir, log_dir): | |
cmd = ( | |
f"ssh {host} sudo docker create --name=curvefs-mds-{service_id} --hostname=curvefs-mds-{service_id} " | |
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvefs/mds/logs " | |
f"--volume={data_dir}:/curvefs/mds/data --network=host --privileged --restart=always " | |
f"--add-host curvefs-mds-{service_id}:127.0.0.1 --runtime=runc {self.image} --role mds --args=" | |
) | |
os.system(cmd) | |
return self.__get_container_id(host, f"curvefs-mds-{service_id}") | |
def create_etcd_container(self, host, service_id, data_dir, log_dir): | |
cmd = ( | |
f"ssh {host} sudo docker create --name=curvefs-etcd-{service_id} --hostname=curvefs-etcd-{service_id} " | |
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvefs/etcd/logs " | |
f"--volume={data_dir}:/curvefs/etcd/data --network=host --privileged --restart=always " | |
f"--add-host curvefs-etcd-{service_id}:127.0.0.1 --runtime=runc {self.image} --role etcd --args=" | |
) | |
os.system(cmd) | |
return self.__get_container_id(host, f"curvefs-etcd-{service_id}") | |
def create_metaserver_container(self, host, service_id, data_dir, log_dir): | |
cmd = ( | |
f"ssh {host} sudo docker create --name=curvefs-metaserver-{service_id} --hostname=curvefs-metaserver-{service_id} " | |
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvefs/metaserver/logs " | |
f"--volume={data_dir}:/curvefs/metaserver/data --network=host --privileged --restart=no " | |
f"--add-host curvefs-metaserver-{service_id}:127.0.0.1 --runtime=runc {self.image} --role metaserver --args=" | |
) | |
os.system(cmd) | |
return self.__get_container_id(host, f"curvefs-metaserver-{service_id}") | |
def create_snapshotclone_container(self, host, service_id, data_dir, log_dir): | |
cmd = ( | |
f"ssh {host} sudo docker create --name=curvebs-snapshotclone-{service_id} --hostname=curvebs-snapshotclone-{service_id} " | |
f"--env=LD_PRELOAD=/usr/local/lib/libjemalloc.so --volume={log_dir}:/curvebs/snapshotclone/logs " | |
f"--volume={data_dir}:/curvebs/snapshotclone/data --network=host --privileged --restart=no " | |
f"--add-host curvebs-snapshotclone-{service_id}:127.0.0.1 --runtime=runc {self.image} --role snapshotclone --args=" | |
) | |
os.system(cmd) | |
return self.__get_container_id(host, f"curvebs-snapshotclone-{service_id}") | |
def remove_container(self, host, container_id): | |
# rm only stopped container | |
cmd = f"ssh {host} sudo docker rm {container_id}" | |
os.system(cmd) | |
def set_topo(path): | |
# for testing | |
cluster = CurveadmCluster(DBPATH, CLUSTER_NAME) | |
cluster.set_topo(path) | |
def get_etcd_init_conf(name, ip, peer_port, client_port, initial_cluster): | |
print( | |
f"name: {name}\n" | |
"data-dir: /curvefs/etcd/data\n" | |
"wal-dir: /curvefs/etcd/data/wal\n" | |
f"listen-peer-urls: http://{ip}:{peer_port}\n" | |
f"listen-client-urls: http://{ip}:{client_port}\n" | |
f"initial-advertise-peer-urls: http://{ip}:{peer_port}\n" | |
f"advertise-client-urls: http://{ip}:{client_port}\n" | |
f"initial-cluster: {initial_cluster}\n" | |
f"initial-cluster-state: existing" | |
) | |
def add_service(service_name, host_alias, instances_num): | |
cluster = CurveadmCluster(DBPATH, CLUSTER_NAME) | |
service_manager = ServiceManager(IMAGE) | |
if service_name not in [ | |
"mds", | |
"etcd", | |
"metaserver", | |
"chunkserver", | |
"snapshotclone", | |
]: | |
raise Exception("service name not support") | |
# 1. add service | |
host_index = cluster.add_service(service_name, host_alias, instances_num) | |
service_ids = cluster.generate_service_ids( | |
service_name, host_alias, host_index, instances_num | |
) | |
# 2. create containers | |
for i in range(instances_num): | |
if service_name == "mds": | |
instance = service_manager.create_mds_container( | |
host_alias, | |
service_ids[i][:12], | |
f"{DATA_DIR_PREFIX}/mds{i}", | |
f"{LOG_DIR_PREFIX}/mds{i}", | |
) | |
elif service_name == "etcd": | |
instance = service_manager.create_etcd_container( | |
host_alias, | |
service_ids[i][:12], | |
f"{DATA_DIR_PREFIX}/etcd{i}", | |
f"{LOG_DIR_PREFIX}/etcd{i}", | |
) | |
elif service_name == "metaserver": | |
instance = service_manager.create_metaserver_container( | |
host_alias, | |
service_ids[i][:12], | |
f"{DATA_DIR_PREFIX}/metaserver{i}", | |
f"{LOG_DIR_PREFIX}/metaserver{i}", | |
) | |
elif service_name == "snapshotclone": | |
instance = service_manager.create_snapshotclone_container( | |
host_alias, | |
service_ids[i][:12], | |
f"{DATA_DIR_PREFIX}/snapshotclone{i}", | |
f"{LOG_DIR_PREFIX}/snapshotclone{i}", | |
) | |
else: | |
break | |
cluster.update_container_id(service_ids[i], instance) | |
def rm_service(service_name, host_alias): | |
cluster = CurveadmCluster(DBPATH, CLUSTER_NAME) | |
service_manager = ServiceManager(IMAGE) | |
if service_name not in [ | |
"mds", | |
"etcd", | |
"metaserver", | |
"chunkserver", | |
"snapshotclone", | |
]: | |
raise Exception("service name not support") | |
# 1. remove containers | |
host_index = cluster.get_host_index(service_name, host_alias) | |
instances_num = cluster.get_instances_num(service_name, host_alias) | |
service_ids = cluster.generate_service_ids( | |
service_name, host_alias, host_index, instances_num | |
) | |
for i in range(instances_num): | |
instance = cluster.get_container_id(service_ids[i][:12]) | |
service_manager.remove_container(host_alias, instance) | |
# 2. remove service | |
cluster.remove_service(service_name, host_alias) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers(dest="subparser_name") | |
parser_service = subparsers.add_parser("service") | |
parser_service.add_argument("action", choices=["add", "rm"]) | |
parser_service.add_argument( | |
"--service-name", | |
choices=["mds", "etcd", "metaserver", "chunkserver", "snapshotclone"], | |
) | |
parser_service.add_argument("--host-alias") | |
parser_service.add_argument("--instances-num", type=int) | |
parser_etcd = subparsers.add_parser("etcd") | |
parser_etcd.add_argument("action", choices=["get-init-conf"]) | |
parser_etcd.add_argument("--name") | |
parser_etcd.add_argument("--ip") | |
parser_etcd.add_argument("--peer-port") | |
parser_etcd.add_argument("--client-port") | |
parser_etcd.add_argument("--initial-cluster") | |
args = parser.parse_args() | |
if args.subparser_name == "service": | |
if args.action == "add": | |
add_service(args.service_name, args.host_alias, args.instances_num) | |
elif args.action == "rm": | |
rm_service(args.service_name, args.host_alias) | |
elif args.subparser_name == "etcd": | |
if args.action == "get-init-conf": | |
get_etcd_init_conf( | |
args.name, | |
args.ip, | |
args.peer_port, | |
args.client_port, | |
args.initial_cluster, | |
) | |
else: | |
parser.print_help() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment