Skip to content

Instantly share code, notes, and snippets.

@QQGoblin
Created December 24, 2021 07:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save QQGoblin/a16c5bd6122f7a387977d23cec5e2fec to your computer and use it in GitHub Desktop.
Save QQGoblin/a16c5bd6122f7a387977d23cec5e2fec to your computer and use it in GitHub Desktop.
【Patroni源码阅读】Leader 更新Endpoint
class Kubernetes(AbstractDCS):
def update_leader(self, last_lsn, slots=None):
# PS:这个是Leader更新Endpoint的接口,没有获取Leader之前,无法更新
# 获取当前leader对应的ep
kind = self._kinds.get(self.leader_path)
kind_annotations = kind and kind.metadata.annotations or {}
# ep上annotations.leader和当前实例名称不一致,退出
if kind and kind_annotations.get(self._LEADER) != self._name:
return False
now = datetime.datetime.now(tzutc).isoformat()
leader_observed_record = kind_annotations or self._leader_observed_record
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl), 'renewTime': now,
'acquireTime': leader_observed_record.get('acquireTime') or now,
'transitions': leader_observed_record.get('transitions') or '0'}
if last_lsn:
annotations[self._OPTIME] = str(last_lsn)
annotations['slots'] = json.dumps(slots) if slots else None
resource_version = kind and kind.metadata.resource_version
# self.__ips这是 Pod ip
# _update_leader_with_retry 接口更新时,会同步更新endpoint当前指到的 pod ip
return self._update_leader_with_retry(annotations, resource_version, self.__ips)
def _update_leader_with_retry(self, annotations, resource_version, ips):
retry = self._retry.copy()
def _retry(*args, **kwargs):
kwargs['_retry'] = retry
return retry(*args, **kwargs)
try:
# 更新ep
return self._patch_or_create(self.leader_path, annotations, resource_version, ips=ips, retry=_retry)
except k8s_client.rest.ApiException as e:
if e.status == 409:
# 更新冲突~~!
# 出现lock竞争的场景,这里会触发重试
logger.warning('Concurrent update of %s', self.leader_path)
else:
logger.exception('Permission denied' if e.status == 403 else 'Unexpected error from Kubernetes API')
return False
except (RetryFailedError, K8sException):
return False
retry.deadline = retry.stoptime - time.time()
if retry.deadline < 1:
return False
# Try to get the latest version directly from K8s API instead of relying on async cache
try:
kind = _retry(self._api.read_namespaced_kind, self.leader_path, self._namespace)
except Exception as e:
logger.error('Failed to get the leader object "%s": %r', self.leader_path, e)
return False
self._kinds.set(self.leader_path, kind)
retry.deadline = retry.stoptime - time.time()
if retry.deadline < 0.5:
return False
kind_annotations = kind and kind.metadata.annotations or {}
kind_resource_version = kind and kind.metadata.resource_version
# There is different leader or resource_version in cache didn't change
# Leader已经被其他人占用
if kind and (kind_annotations.get(self._LEADER) != self._name or kind_resource_version == resource_version):
return False
# 还没有被占用继续重试,知道leader出现
return self.patch_or_create(self.leader_path, annotations, kind_resource_version, ips=ips, retry=_retry)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment