Skip to content

Instantly share code, notes, and snippets.

@QQGoblin
Created December 30, 2021 05:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save QQGoblin/cd44cb8d377874231770b163d5dbb712 to your computer and use it in GitHub Desktop.
Save QQGoblin/cd44cb8d377874231770b163d5dbb712 to your computer and use it in GitHub Desktop.
【Patroni源码阅读】同步Standby自动调整
class Ha(object):
def process_sync_replication(self):
"""Process synchronous standby beahvior.
Synchronous standbys are registered in two places postgresql.conf and DCS. The order of updating them must
be right. The invariant that should be kept is that if a node is master and sync_standby is set in DCS,
then that node must have synchronous_standby set to that value. Or more simple, first set in postgresql.conf
and then in DCS. When removing, first remove in DCS, then in postgresql.conf. This is so we only consider
promoting standbys that were guaranteed to be replicating synchronously.
"""
if self.is_synchronous_mode():
sync_node_count = self.patroni.config['synchronous_node_count']
current = self.cluster.sync.leader and self.cluster.sync.members or []
# picked 表示所有候选者
# allow_promote 表示当前是sync的候选者
# maximum_lag_on_syncnode:指的是standby中wal的最大差值
picked, allow_promote = self.state_handler.pick_synchronous_standby(self.cluster, sync_node_count,
self.patroni.config[
'maximum_lag_on_syncnode'])
# 候选者和Current不一致,可能出现:
# 1. picked包括current
# 2. picked
if set(picked) != set(current):
# update synchronous standby list in dcs temporarily to point to common nodes in current and picked
# sync_common:current和allow_promote的交集,表示当前不用变动的部分
sync_common = list(set(current).intersection(set(allow_promote)))
# sync_common!=current:当前standby中有非同步的node
if set(sync_common) != set(current):
logger.info("Updating synchronous privilege temporarily from %s to %s", current, sync_common)
# 选择不一致,更新dcs配置
if not self.dcs.write_sync_state(self.state_handler.name,
sync_common or None,
index=self.cluster.sync.index):
logger.info('Synchronous replication key updated by someone else.')
return
# Update db param and wait for x secs
if self.is_synchronous_mode_strict() and not picked:
picked = ['*']
logger.warning("No standbys available!")
logger.info("Assigning synchronous standby status to %s", picked)
# 更新pg配置
# 将picked候选者更新,picked的数量等于synchronous_node_count,并且是优先级最高的候选
self.state_handler.config.set_synchronous_standby(picked)
if picked and picked[0] != '*' and set(allow_promote) != set(picked) and not allow_promote:
# Wait for PostgreSQL to enable synchronous mode and see if we can immediately set sync_standby
time.sleep(2)
# 修改之后重新活获取同步standby,理想情况 allow_promote = picked
_, allow_promote = self.state_handler.pick_synchronous_standby(self.cluster,
sync_node_count,
self.patroni.config[
'maximum_lag_on_syncnode'])
if allow_promote and set(allow_promote) != set(sync_common):
try:
cluster = self.dcs.get_cluster()
except DCSError:
return logger.warning("Could not get cluster state from DCS during process_sync_replication()")
if cluster.sync.leader and cluster.sync.leader != self.state_handler.name:
logger.info("Synchronous replication key updated by someone else")
return
if not self.dcs.write_sync_state(self.state_handler.name, allow_promote, index=cluster.sync.index):
logger.info("Synchronous replication key updated by someone else")
return
logger.info("Synchronous standby status assigned to %s", allow_promote)
else:
# 不是synchronous_mode模式,删除dcs记录,并且清空pg配置
if self.cluster.sync.leader and self.dcs.delete_sync_state(index=self.cluster.sync.index):
logger.info("Disabled synchronous replication")
self.state_handler.config.set_synchronous_standby([])
def pick_synchronous_standby(self, cluster, sync_node_count=1, sync_node_maxlag=-1):
"""Finds the best candidate to be the synchronous standby.
Current synchronous standby is always preferred, unless it has disconnected or does not want to be a
synchronous standby any longer.
Parameter sync_node_maxlag(maximum_lag_on_syncnode) would help swapping unhealthy sync replica in case
if it stops responding (or hung). Please set the value high enough so it won't unncessarily swap sync
standbys during high loads. Any less or equal of 0 value keep the behavior backward compatible and
will not swap. Please note that it will not also swap sync standbys in case where all replicas are hung.
:returns tuple of candidates list and synchronous standby list.
"""
if self._major_version < 90600:
sync_node_count = 1
members = {m.name.lower(): m for m in cluster.members}
candidates = []
sync_nodes = []
replica_list = []
# Pick candidates based on who has higher replay/remote_write/flush lsn.
# sync_commit_par 返回SHOW synchronous_commit的值,这里pg11返回的是on
sync_commit_par = self._get_synchronous_commit_param()
# 应该是flush?
sort_col = {'remote_apply': 'replay', 'remote_write': 'write'}.get(sync_commit_par, 'flush')
# pg_stat_replication.sync_state has 4 possible states - async, potential, quorum, sync.
# Sort clause "ORDER BY sync_state DESC" is to get the result in required order and to keep
# the result consistent in case if a synchronous standby member is slowed down OR async node
# receiving changes faster than the sync member (very rare but possible). Such cases would
# trigger sync standby member swapping frequently and the sort on sync_state desc should
# help in keeping the query result consistent.
# 下面SQL
# SELECT
# pg_catalog.lower(application_name),
# sync_state,
# pg_wal_lsn_diff(flush_lsn, '0/0')::bigint
# FROM pg_catalog.pg_stat_replication
# WHERE
# state = 'streaming' AND
# flush_lsn IS NOT NULL
# ORDER BY
# sync_state DESC,
# flush_lsn DESC;
# 返回:
# lower | sync_state | pg_wal_lsn_diff
# ----------------+------------+-----------------
# patroni-rccp-1 | sync | 83886400
# patroni-rccp-2 | async | 83886400
for app_name, sync_state, replica_lsn in self.query(
"SELECT pg_catalog.lower(application_name), sync_state, pg_{2}_{1}_diff({0}_{1}, '0/0')::bigint"
" FROM pg_catalog.pg_stat_replication"
" WHERE state = 'streaming' AND {0}_{1} IS NOT NULL"
" ORDER BY sync_state DESC, {0}_{1} DESC".format(sort_col, self.lsn_name, self.wal_name)):
member = members.get(app_name)
# 判断member是否标记了nosync,即不作为同步节点
if member and not member.tags.get('nosync', False):
replica_list.append((member.name, sync_state, replica_lsn, bool(member.nofailover)))
# max_lsn表示所有standby节点中,lsn最大者
max_lsn = max(replica_list, key=lambda x: x[2])[2] if len(replica_list) > 1 else int(str(self.last_operation()))
# Prefer members without nofailover tag. We are relying on the fact that sorts are guaranteed to be stable.
for app_name, sync_state, replica_lsn, _ in sorted(replica_list, key=lambda x: x[3]):
if sync_node_maxlag <= 0 or max_lsn - replica_lsn <= sync_node_maxlag:
candidates.append(app_name)
if sync_state == 'sync':
sync_nodes.append(app_name)
if len(candidates) >= sync_node_count:
break
return candidates, sync_nodes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment