This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import argparse | |
import json | |
import logging | |
import re | |
import os | |
from json.decoder import JSONDecodeError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ApiClient(object): | |
_API_URL_PREFIX = '/api/v1/namespaces/' | |
def __init__(self, bypass_api_service=False): | |
self._bypass_api_service = bypass_api_service | |
self.pool_manager = urllib3.PoolManager(**k8s_config.pool_config) | |
self._base_uri = k8s_config.server | |
self._api_servers_cache = [k8s_config.server] | |
self._api_servers_cache_updated = 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_dcs(config): | |
# 获取所有DCS实现的名称,dcs_modules()通过pkgutil查询dcs目录下所有python文件的名称 | |
# 最终返回一个List,内容为:patroni.dcs.kubernetes 等等。 | |
modules = dcs_modules() | |
for module_name in modules: | |
name = module_name.split('.')[-1] # dcs名称:如etcd、kubernetes等等 | |
if name in config: # 判断配置文件中是否有对应的名称的Section配置段 | |
try: | |
module = importlib.import_module(module_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_cluster(self): | |
stop_time = time.time() + self._retry.deadline | |
# 刷新kube-apiserver的地址信息 | |
self._api.refresh_api_servers_cache() | |
try: | |
with self._condition: | |
# 等待podsCache和kindCache的缓存建立完毕 | |
self._wait_caches(stop_time) | |
# 填充集群中的member信息(Pod信息)以及Endpoint信息 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Patroni(AbstractPatroniDaemon): | |
def __init__(self, config): | |
super(Patroni, self).__init__(config) | |
self.version = __version__ | |
# 通过配置文件中的配置,动态加载dcs的lib,并初始换 | |
self.dcs = get_dcs(self.config) | |
# 初始化watchdog服务,PS:作为容器运行时不需要 | |
self.watchdog = Watchdog(self.config) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Ha(object): | |
""" | |
在DCS上更新Postgresql节点的状态 | |
""" | |
def touch_member(self): | |
with self._member_state_lock: | |
data = { | |
'conn_url': self.state_handler.connection_string, | |
'api_url': self.patroni.api.connection_string, | |
'state': self.state_handler.state, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Postgresql(object): | |
def stop(self, mode='fast', block_callbacks=False, checkpoint=None, | |
on_safepoint=None, on_shutdown=None, stop_timeout=None): | |
"""Stop PostgreSQL | |
Supports a callback when a safepoint is reached. A safepoint is when no user backend can return a successful | |
commit to users. Currently this means we wait for user backends to close. But in the future alternate mechanisms | |
could be added. | |
:param on_safepoint: This callback is called when no user backends are running. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Postgresql(object): | |
def start(self, timeout=None, task=None, block_callbacks=False, role=None): | |
"""Start PostgreSQL | |
Waits for postmaster to open ports or terminate so pg_isready can be used to check startup completion | |
or failure. | |
:returns: True if start was initiated and postmaster ports are open, False if start failed""" | |
# make sure we close all connections established against | |
# the former node, otherwise, we might get a stalled one |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Ha(object): | |
def demote(self, mode): | |
"""Demote PostgreSQL running as master. | |
:param mode: One of offline, graceful or immediate. | |
offline is used when connection to DCS is not available. | |
graceful is used when failing over to another node due to user request. May only be called running async. | |
immediate is used when we determine that we are not suitable for master and want to failover quickly | |
without regard for data durability. May only be called synchronously. | |
immediate-nolock is used when find out that we have lost the lock to be master. Need to bring down |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Kubernetes(AbstractDCS): | |
def update_leader(self, last_lsn, slots=None): | |
# PS:这个是Leader更新Endpoint的接口,没有获取Leader之前,无法更新 | |
# 获取当前leader对应的ep | |
kind = self._kinds.get(self.leader_path) | |
kind_annotations = kind and kind.metadata.annotations or {} | |
# ep上annotations.leader和当前实例名称不一致,退出 | |
if kind and kind_annotations.get(self._LEADER) != self._name: | |
return False |
OlderNewer