Skip to content

Instantly share code, notes, and snippets.

@QQGoblin
QQGoblin / configure_spilo.py
Last active December 21, 2021 08:04
【Patroni源码阅读】spilo项目源码阅读
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import logging
import re
import os
from json.decoder import JSONDecodeError
@QQGoblin
QQGoblin / ApiClient.py
Last active December 21, 2021 08:03
【Patroni源码阅读】Kubernetes Client
class ApiClient(object):
_API_URL_PREFIX = '/api/v1/namespaces/'
def __init__(self, bypass_api_service=False):
self._bypass_api_service = bypass_api_service
self.pool_manager = urllib3.PoolManager(**k8s_config.pool_config)
self._base_uri = k8s_config.server
self._api_servers_cache = [k8s_config.server]
self._api_servers_cache_updated = 0
@QQGoblin
QQGoblin / get_dcs.py
Last active December 21, 2021 08:03
【Patroni源码阅读】dcs初始化和导入
def get_dcs(config):
# 获取所有DCS实现的名称,dcs_modules()通过pkgutil查询dcs目录下所有python文件的名称
# 最终返回一个List,内容为:patroni.dcs.kubernetes 等等。
modules = dcs_modules()
for module_name in modules:
name = module_name.split('.')[-1] # dcs名称:如etcd、kubernetes等等
if name in config: # 判断配置文件中是否有对应的名称的Section配置段
try:
module = importlib.import_module(module_name)
@QQGoblin
QQGoblin / load_cluster.py
Last active December 21, 2021 08:02
【Patroni源码阅读】Kubernetes DCS初始化 Cluster
def _load_cluster(self):
stop_time = time.time() + self._retry.deadline
# 刷新kube-apiserver的地址信息
self._api.refresh_api_servers_cache()
try:
with self._condition:
# 等待podsCache和kindCache的缓存建立完毕
self._wait_caches(stop_time)
# 填充集群中的member信息(Pod信息)以及Endpoint信息
@QQGoblin
QQGoblin / patroni.py
Created December 22, 2021 09:33
【Patroni】patroni启动流程
class Patroni(AbstractPatroniDaemon):
def __init__(self, config):
super(Patroni, self).__init__(config)
self.version = __version__
# 通过配置文件中的配置,动态加载dcs的lib,并初始换
self.dcs = get_dcs(self.config)
# 初始化watchdog服务,PS:作为容器运行时不需要
self.watchdog = Watchdog(self.config)
@QQGoblin
QQGoblin / touch_member.py
Created December 23, 2021 08:17
【Patroni源码阅读】Ha touch_member
class Ha(object):
"""
在DCS上更新Postgresql节点的状态
"""
def touch_member(self):
with self._member_state_lock:
data = {
'conn_url': self.state_handler.connection_string,
'api_url': self.patroni.api.connection_string,
'state': self.state_handler.state,
@QQGoblin
QQGoblin / stop.py
Last active December 23, 2021 09:34
【Patroni源码阅读】Postgresql 停止PG
class Postgresql(object):
def stop(self, mode='fast', block_callbacks=False, checkpoint=None,
on_safepoint=None, on_shutdown=None, stop_timeout=None):
"""Stop PostgreSQL
Supports a callback when a safepoint is reached. A safepoint is when no user backend can return a successful
commit to users. Currently this means we wait for user backends to close. But in the future alternate mechanisms
could be added.
:param on_safepoint: This callback is called when no user backends are running.
@QQGoblin
QQGoblin / start.py
Created December 23, 2021 12:36
【Patroni源码阅读】Postgresql 启动PG
class Postgresql(object):
def start(self, timeout=None, task=None, block_callbacks=False, role=None):
"""Start PostgreSQL
Waits for postmaster to open ports or terminate so pg_isready can be used to check startup completion
or failure.
:returns: True if start was initiated and postmaster ports are open, False if start failed"""
# make sure we close all connections established against
# the former node, otherwise, we might get a stalled one
@QQGoblin
QQGoblin / demote.py
Created December 23, 2021 12:45
【Patroni源码阅读】将Postgres实例降级为replicas
class Ha(object):
def demote(self, mode):
"""Demote PostgreSQL running as master.
:param mode: One of offline, graceful or immediate.
offline is used when connection to DCS is not available.
graceful is used when failing over to another node due to user request. May only be called running async.
immediate is used when we determine that we are not suitable for master and want to failover quickly
without regard for data durability. May only be called synchronously.
immediate-nolock is used when find out that we have lost the lock to be master. Need to bring down
@QQGoblin
QQGoblin / update_leader.py
Created December 24, 2021 07:42
【Patroni源码阅读】Leader 更新Endpoint
class Kubernetes(AbstractDCS):
def update_leader(self, last_lsn, slots=None):
# PS:这个是Leader更新Endpoint的接口,没有获取Leader之前,无法更新
# 获取当前leader对应的ep
kind = self._kinds.get(self.leader_path)
kind_annotations = kind and kind.metadata.annotations or {}
# ep上annotations.leader和当前实例名称不一致,退出
if kind and kind_annotations.get(self._LEADER) != self._name:
return False