Created
December 22, 2021 09:33
-
-
Save QQGoblin/0419f0fc2b83bd53904bbe53119f42d3 to your computer and use it in GitHub Desktop.
【Patroni】patroni启动流程
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Patroni(AbstractPatroniDaemon): | |
def __init__(self, config): | |
super(Patroni, self).__init__(config) | |
self.version = __version__ | |
# 通过配置文件中的配置,动态加载dcs的lib,并初始换 | |
self.dcs = get_dcs(self.config) | |
# 初始化watchdog服务,PS:作为容器运行时不需要 | |
self.watchdog = Watchdog(self.config) | |
# 加载配置文件,合并dcs配置、配置文件、环境变量的配置 | |
self.load_dynamic_configuration() | |
# 初始化Postgresql | |
self.postgresql = Postgresql(self.config['postgresql']) | |
# 初始化RestApiServer | |
self.api = RestApiServer(self, self.config['restapi']) | |
self.request = PatroniRequest(self.config, True) | |
# 初始Ha | |
self.ha = Ha(self) | |
self.tags = self.get_tags() | |
self.next_run = time.time() | |
self.scheduled_restart = {} | |
def load_dynamic_configuration(self): | |
from patroni.exceptions import DCSError | |
while True: | |
try: | |
cluster = self.dcs.get_cluster() | |
if cluster and cluster.config and cluster.config.data: | |
if self.config.set_dynamic_configuration(cluster.config): | |
self.dcs.reload_config(self.config) | |
self.watchdog.reload_config(self.config) | |
# 如果当前DCS上的动态配置为空,加载本地配置self.config['bootstrap']['dcs']到dcs | |
elif not self.config.dynamic_configuration and 'bootstrap' in self.config: | |
if self.config.set_dynamic_configuration(self.config['bootstrap']['dcs']): | |
self.dcs.reload_config(self.config) | |
break | |
except DCSError: | |
logger.warning('Can not get cluster from dcs') | |
time.sleep(5) | |
def run(self): | |
# 启动RestApiServer | |
self.api.start() | |
# 更新当前时间戳 | |
self.next_run = time.time() | |
# AbstractPatroniDaemon的代码中启动一个while循环,循环中监听系统信号量判断是否需要重新reload配置,并调用下面的_run_cycle方法 | |
# def run(self): | |
# self.logger.start() | |
# while not self.received_sigterm: | |
# # 更新动态更新配置 | |
# if self._received_sighup: | |
# self._received_sighup = False | |
# self.reload_config(True, self.config.reload_local_configuration()) | |
# self._run_cycle() | |
super(Patroni, self).run() | |
def _run_cycle(self): | |
# ha.run_cycle()包含了patroni运行时每个心跳周期的主要逻辑 | |
logger.info(self.ha.run_cycle()) | |
if self.dcs.cluster and self.dcs.cluster.config and self.dcs.cluster.config.data \ | |
and self.config.set_dynamic_configuration(self.dcs.cluster.config): | |
self.reload_config() | |
if self.postgresql.role != 'uninitialized': | |
# 保存patroni.dynamic.json文件到pg的数据目录 | |
self.config.save_cache() | |
# schedule_next_run() 会调用 ha.watch() -> dcs.watch() 进入下一个心跳的循环 | |
self.schedule_next_run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment