Skip to content

Instantly share code, notes, and snippets.

@QQGoblin
Last active December 21, 2021 08:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save QQGoblin/30169e845cd1d22afe14e8287e8157d9 to your computer and use it in GitHub Desktop.
Save QQGoblin/30169e845cd1d22afe14e8287e8157d9 to your computer and use it in GitHub Desktop.
【Patroni源码阅读】Kubernetes DCS初始化 Cluster
def _load_cluster(self):
stop_time = time.time() + self._retry.deadline
# 刷新kube-apiserver的地址信息
self._api.refresh_api_servers_cache()
try:
with self._condition:
# 等待podsCache和kindCache的缓存建立完毕
self._wait_caches(stop_time)
# 填充集群中的member信息(Pod信息)以及Endpoint信息
# Member对象表示PostgreSQL Cluster中的一个成员,其实现是一个namedtuple(命名元组)
members = [self.member(pod) for pod in self._pods.copy().values()]
# nodes是endpoint对象的Cache,包括通常包括以下三个ep:
# - <cluster-name>-config:只用来保存数据,没有实际意义
# - <cluster-name>:指向Master节点
# - <cluster-name>-repl:指向standby节点
nodes = self._kinds.copy()
# 获取对应ep对象,即<cluster-name>-config
config = nodes.get(self.config_path)
metadata = config and config.metadata
annotations = metadata and metadata.annotations or {}
# initialize信息记录在annotations中是一串随机数列,例如:7043029151653077050
initialize = annotations.get(self._INITIALIZE)
# get global dynamic configuration
# ClusterConfig和Member类似同样是namedtuple(命名元组)
config = ClusterConfig.from_node(metadata and metadata.resource_version,
annotations.get(self._CONFIG) or '{}',
metadata.resource_version if self._CONFIG in annotations else 0)
# get timeline history
history = TimelineHistory.from_node(metadata and metadata.resource_version,
annotations.get(self._HISTORY) or '[]')
# leader实际上是和cluster-name同名的endpoint,它实际表示了Master实例的连接地址
leader = nodes.get(self.leader_path)
metadata = leader and leader.metadata
self._leader_resource_version = metadata.resource_version if metadata else None
annotations = metadata and metadata.annotations or {}
# get last known leader lsn
# 表示ep上key为optime的annotations,例如:optime: "100663296"
last_lsn = annotations.get(self._OPTIME)
try:
last_lsn = 0 if last_lsn is None else int(last_lsn)
except Exception:
last_lsn = 0
# get permanent slots state (confirmed_flush_lsn)
# 默认情况下:可能没有annotations的key为slots,即slots=None
slots = annotations.get('slots')
try:
slots = slots and json.loads(slots)
except Exception:
slots = None
# leader_record的内容如下:
# {
# "acquireTime": "2021-12-21T05:48:54.203818+00:00",
# "leader": "rccp-minimal-cluster-1",
# "renewTime": "2021-12-21T06:40:05.755571+00:00",
# "transitions": "1",
# "ttl": "30"
# }
#
leader_record = {n: annotations.get(n) for n in (self._LEADER, 'acquireTime',
'ttl', 'renewTime', 'transitions') if n in annotations}
if (leader_record or self._leader_observed_record) and leader_record != self._leader_observed_record:
self._leader_observed_record = leader_record
self._leader_observed_time = time.time()
leader = leader_record.get(self._LEADER)
try:
ttl = int(leader_record.get('ttl')) or self._ttl
except (TypeError, ValueError):
ttl = self._ttl
# 如果没有从ep获取leader信息,那么将leader设置为None
if not metadata or not self._leader_observed_time or self._leader_observed_time + ttl < time.time():
leader = None
if metadata:
# 封装 Leader对象
member = Member(-1, leader, None, {})
member = ([m for m in members if m.name == leader] or [member])[0]
leader = Leader(metadata.resource_version, None, member)
# 正常情况下,Kuberentes中应该不存在用于存储failover的endpoint
# 当用户触发failover时,patroni会在创建 <cluster-name>-failover 名称的 endpoint,内容大致如下:
# apiVersion: v1
# kind: Endpoints
# metadata:
# annotations:
# leader: rccp-minimal-cluster-0
# member: rccp-minimal-cluster-2
# scheduled_at: "2021-12-23T11:20:00+08:00"
# creationTimestamp: "2021-12-21T06:55:15Z"
# labels:
# application: spilo
# cluster-name: rccp-minimal-cluster
# name: rccp-minimal-cluster-failover
# namespace: default
# resourceVersion: "3000706"
# uid: c5433dd8-d1ab-467c-8caf-9252ff9802df
#
failover = nodes.get(self.failover_path)
metadata = failover and failover.metadata # 值为None
failover = Failover.from_node(metadata and metadata.resource_version,
metadata and (metadata.annotations or {}).copy())
# 正常情况下,Kuberentes中应该不存在用于存储failover的endpoint
# 当存在sync操作时,patroni会在创建 <cluster-name>-sync 名称的 endpoint
# reference to `SyncState` object, last observed synchronous replication state.
sync = nodes.get(self.sync_path)
metadata = sync and sync.metadata
sync = SyncState.from_node(metadata and metadata.resource_version, metadata and metadata.annotations)
return Cluster(initialize, config, leader, last_lsn, members, failover, sync, history, slots)
except Exception:
logger.exception('get_cluster')
raise KubernetesError('Kubernetes API is not responding properly')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment