Skip to content

Instantly share code, notes, and snippets.

@QQGoblin
Created December 23, 2021 12:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save QQGoblin/7b2dadf0c28c58d31d5c46796947d2e4 to your computer and use it in GitHub Desktop.
Save QQGoblin/7b2dadf0c28c58d31d5c46796947d2e4 to your computer and use it in GitHub Desktop.
【Patroni源码阅读】将Postgres实例降级为replicas
class Ha(object):
def demote(self, mode):
"""Demote PostgreSQL running as master.
:param mode: One of offline, graceful or immediate.
offline is used when connection to DCS is not available.
graceful is used when failing over to another node due to user request. May only be called running async.
immediate is used when we determine that we are not suitable for master and want to failover quickly
without regard for data durability. May only be called synchronously.
immediate-nolock is used when find out that we have lost the lock to be master. Need to bring down
PostgreSQL as quickly as possible without regard for data durability. May only be called synchronously.
"""
mode_control = {
'offline': dict(stop='fast', checkpoint=False, release=False, offline=True, async_req=False),
'graceful': dict(stop='fast', checkpoint=True, release=True, offline=False, async_req=False),
'immediate': dict(stop='immediate', checkpoint=False, release=True, offline=False, async_req=True),
'immediate-nolock': dict(stop='immediate', checkpoint=False, release=False, offline=False, async_req=True),
}[mode]
logger.info('Demoting self (%s)', mode)
self._rewind.trigger_check_diverged_lsn()
status = {'released': False}
def on_shutdown(checkpoint_location):
# Postmaster is still running, but pg_control already reports clean "shut down".
# It could happen if Postgres is still archiving the backlog of WAL files.
# If we know that there are replicas that received the shutdown checkpoint
# location, we can remove the leader key and allow them to start leader race.
if self.is_failover_possible(self.cluster.members, cluster_lsn=checkpoint_location):
self.state_handler.set_role('demoted')
with self._async_executor:
# 释放leader,并且更新dcs上示例的状态
self.release_leader_key_voluntarily(checkpoint_location)
status['released'] = True
# 关闭当前节点上的postgres,参数含义:
# - fast/immediate:是否强制kill进程
# - checkpoint:是否执行CHECKPOINT,刷新脏数据
# - on_safepoint:回调函数,这里pg进程全部退出后关闭watchdog
# - on_shutdown:回调函数,用来处理postgres主进程不能完全退出的场景,此时 pg_controldata 状态已经是shut down
# - stop_timeout ?
self.state_handler.stop(mode_control['stop'], checkpoint=mode_control['checkpoint'],
on_safepoint=self.watchdog.disable if self.watchdog.is_running else None,
on_shutdown=on_shutdown if mode_control['release'] else None,
stop_timeout=self.master_stop_timeout())
# demoted 表示节点处于降级状态
self.state_handler.set_role('demoted')
self.set_is_leader(False)
if mode_control['release']:
# 获取当前实例的确切checkpoint_location,并且更新信息到dcs
if not status['released']:
checkpoint_location = self.state_handler.latest_checkpoint_location() if mode == 'graceful' else None
with self._async_executor:
self.release_leader_key_voluntarily(checkpoint_location)
time.sleep(2) # Give a time to somebody to take the leader lock
if mode_control['offline']:
node_to_follow, leader = None, None
else:
try:
cluster = self.dcs.get_cluster()
# 正常情况下:_get_node_to_follow 函数返回的是leader的Member
node_to_follow, leader = self._get_node_to_follow(cluster), cluster.leader
except Exception:
node_to_follow, leader = None, None
# FIXME: with mode offline called from DCS exception handler and handle_long_action_in_progress
# there could be an async action already running, calling follow from here will lead
# to racy state handler state updates.
# async_req参数在immediate-nolock或者immediate为true,即一个异步的请求
if mode_control['async_req']:
# follow函数在一个新的thread中调用follow接口
self._async_executor.try_run_async('starting after demotion', self.state_handler.follow, (node_to_follow,))
else:
if self.is_synchronous_mode():
self.state_handler.config.set_synchronous_standby([])
# 判断重新启动时是否需要进行rewind,因为Master如果来回切换,可能会导致pg实例之间的数据不一致(PS:当前节点可能存在一些新master没有的数据)
# pg_rewind 当前节点同步到和leader一致
if self._rewind.rewind_or_reinitialize_needed_and_possible(leader):
# 不要启动postgres, 当在下一个迭代启动pg_rewind,这里通过
return False # do not start postgres, but run pg_rewind on the next iteration
# 同步请求调用 follow 函数
self.state_handler.follow(node_to_follow)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment