Created
December 24, 2021 07:42
-
-
Save QQGoblin/a16c5bd6122f7a387977d23cec5e2fec to your computer and use it in GitHub Desktop.
【Patroni源码阅读】Leader 更新Endpoint
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Kubernetes(AbstractDCS): | |
def update_leader(self, last_lsn, slots=None): | |
# PS:这个是Leader更新Endpoint的接口,没有获取Leader之前,无法更新 | |
# 获取当前leader对应的ep | |
kind = self._kinds.get(self.leader_path) | |
kind_annotations = kind and kind.metadata.annotations or {} | |
# ep上annotations.leader和当前实例名称不一致,退出 | |
if kind and kind_annotations.get(self._LEADER) != self._name: | |
return False | |
now = datetime.datetime.now(tzutc).isoformat() | |
leader_observed_record = kind_annotations or self._leader_observed_record | |
annotations = {self._LEADER: self._name, 'ttl': str(self._ttl), 'renewTime': now, | |
'acquireTime': leader_observed_record.get('acquireTime') or now, | |
'transitions': leader_observed_record.get('transitions') or '0'} | |
if last_lsn: | |
annotations[self._OPTIME] = str(last_lsn) | |
annotations['slots'] = json.dumps(slots) if slots else None | |
resource_version = kind and kind.metadata.resource_version | |
# self.__ips这是 Pod ip | |
# _update_leader_with_retry 接口更新时,会同步更新endpoint当前指到的 pod ip | |
return self._update_leader_with_retry(annotations, resource_version, self.__ips) | |
def _update_leader_with_retry(self, annotations, resource_version, ips): | |
retry = self._retry.copy() | |
def _retry(*args, **kwargs): | |
kwargs['_retry'] = retry | |
return retry(*args, **kwargs) | |
try: | |
# 更新ep | |
return self._patch_or_create(self.leader_path, annotations, resource_version, ips=ips, retry=_retry) | |
except k8s_client.rest.ApiException as e: | |
if e.status == 409: | |
# 更新冲突~~! | |
# 出现lock竞争的场景,这里会触发重试 | |
logger.warning('Concurrent update of %s', self.leader_path) | |
else: | |
logger.exception('Permission denied' if e.status == 403 else 'Unexpected error from Kubernetes API') | |
return False | |
except (RetryFailedError, K8sException): | |
return False | |
retry.deadline = retry.stoptime - time.time() | |
if retry.deadline < 1: | |
return False | |
# Try to get the latest version directly from K8s API instead of relying on async cache | |
try: | |
kind = _retry(self._api.read_namespaced_kind, self.leader_path, self._namespace) | |
except Exception as e: | |
logger.error('Failed to get the leader object "%s": %r', self.leader_path, e) | |
return False | |
self._kinds.set(self.leader_path, kind) | |
retry.deadline = retry.stoptime - time.time() | |
if retry.deadline < 0.5: | |
return False | |
kind_annotations = kind and kind.metadata.annotations or {} | |
kind_resource_version = kind and kind.metadata.resource_version | |
# There is different leader or resource_version in cache didn't change | |
# Leader已经被其他人占用 | |
if kind and (kind_annotations.get(self._LEADER) != self._name or kind_resource_version == resource_version): | |
return False | |
# 还没有被占用继续重试,知道leader出现 | |
return self.patch_or_create(self.leader_path, annotations, kind_resource_version, ips=ips, retry=_retry) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment