Last active
December 21, 2021 08:02
-
-
Save QQGoblin/30169e845cd1d22afe14e8287e8157d9 to your computer and use it in GitHub Desktop.
【Patroni源码阅读】Kubernetes DCS初始化 Cluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _load_cluster(self): | |
stop_time = time.time() + self._retry.deadline | |
# 刷新kube-apiserver的地址信息 | |
self._api.refresh_api_servers_cache() | |
try: | |
with self._condition: | |
# 等待podsCache和kindCache的缓存建立完毕 | |
self._wait_caches(stop_time) | |
# 填充集群中的member信息(Pod信息)以及Endpoint信息 | |
# Member对象表示PostgreSQL Cluster中的一个成员,其实现是一个namedtuple(命名元组) | |
members = [self.member(pod) for pod in self._pods.copy().values()] | |
# nodes是endpoint对象的Cache,包括通常包括以下三个ep: | |
# - <cluster-name>-config:只用来保存数据,没有实际意义 | |
# - <cluster-name>:指向Master节点 | |
# - <cluster-name>-repl:指向standby节点 | |
nodes = self._kinds.copy() | |
# 获取对应ep对象,即<cluster-name>-config | |
config = nodes.get(self.config_path) | |
metadata = config and config.metadata | |
annotations = metadata and metadata.annotations or {} | |
# initialize信息记录在annotations中是一串随机数列,例如:7043029151653077050 | |
initialize = annotations.get(self._INITIALIZE) | |
# get global dynamic configuration | |
# ClusterConfig和Member类似同样是namedtuple(命名元组) | |
config = ClusterConfig.from_node(metadata and metadata.resource_version, | |
annotations.get(self._CONFIG) or '{}', | |
metadata.resource_version if self._CONFIG in annotations else 0) | |
# get timeline history | |
history = TimelineHistory.from_node(metadata and metadata.resource_version, | |
annotations.get(self._HISTORY) or '[]') | |
# leader实际上是和cluster-name同名的endpoint,它实际表示了Master实例的连接地址 | |
leader = nodes.get(self.leader_path) | |
metadata = leader and leader.metadata | |
self._leader_resource_version = metadata.resource_version if metadata else None | |
annotations = metadata and metadata.annotations or {} | |
# get last known leader lsn | |
# 表示ep上key为optime的annotations,例如:optime: "100663296" | |
last_lsn = annotations.get(self._OPTIME) | |
try: | |
last_lsn = 0 if last_lsn is None else int(last_lsn) | |
except Exception: | |
last_lsn = 0 | |
# get permanent slots state (confirmed_flush_lsn) | |
# 默认情况下:可能没有annotations的key为slots,即slots=None | |
slots = annotations.get('slots') | |
try: | |
slots = slots and json.loads(slots) | |
except Exception: | |
slots = None | |
# leader_record的内容如下: | |
# { | |
# "acquireTime": "2021-12-21T05:48:54.203818+00:00", | |
# "leader": "rccp-minimal-cluster-1", | |
# "renewTime": "2021-12-21T06:40:05.755571+00:00", | |
# "transitions": "1", | |
# "ttl": "30" | |
# } | |
# | |
leader_record = {n: annotations.get(n) for n in (self._LEADER, 'acquireTime', | |
'ttl', 'renewTime', 'transitions') if n in annotations} | |
if (leader_record or self._leader_observed_record) and leader_record != self._leader_observed_record: | |
self._leader_observed_record = leader_record | |
self._leader_observed_time = time.time() | |
leader = leader_record.get(self._LEADER) | |
try: | |
ttl = int(leader_record.get('ttl')) or self._ttl | |
except (TypeError, ValueError): | |
ttl = self._ttl | |
# 如果没有从ep获取leader信息,那么将leader设置为None | |
if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time(): | |
leader = None | |
if metadata: | |
# 封装 Leader对象 | |
member = Member(-1, leader, None, {}) | |
member = ([m for m in members if m.name == leader] or [member])[0] | |
leader = Leader(metadata.resource_version, None, member) | |
# 正常情况下,Kuberentes中应该不存在用于存储failover的endpoint | |
# 当用户触发failover时,patroni会在创建 <cluster-name>-failover 名称的 endpoint,内容大致如下: | |
# apiVersion: v1 | |
# kind: Endpoints | |
# metadata: | |
# annotations: | |
# leader: rccp-minimal-cluster-0 | |
# member: rccp-minimal-cluster-2 | |
# scheduled_at: "2021-12-23T11:20:00+08:00" | |
# creationTimestamp: "2021-12-21T06:55:15Z" | |
# labels: | |
# application: spilo | |
# cluster-name: rccp-minimal-cluster | |
# name: rccp-minimal-cluster-failover | |
# namespace: default | |
# resourceVersion: "3000706" | |
# uid: c5433dd8-d1ab-467c-8caf-9252ff9802df | |
# | |
failover = nodes.get(self.failover_path) | |
metadata = failover and failover.metadata # 值为None | |
failover = Failover.from_node(metadata and metadata.resource_version, | |
metadata and (metadata.annotations or {}).copy()) | |
# 正常情况下,Kuberentes中应该不存在用于存储failover的endpoint | |
# 当存在sync操作时,patroni会在创建 <cluster-name>-sync 名称的 endpoint | |
# reference to `SyncState` object, last observed synchronous replication state. | |
sync = nodes.get(self.sync_path) | |
metadata = sync and sync.metadata | |
sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations) | |
return Cluster(initialize, config, leader, last_lsn, members, failover, sync, history, slots) | |
except Exception: | |
logger.exception('get_cluster') | |
raise KubernetesError('Kubernetes API is not responding properly') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment