Skip to content

Instantly share code, notes, and snippets.

@QQGoblin
Last active December 21, 2021 08:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save QQGoblin/ecb5975bf0fbd38cb1d4ece662e8b696 to your computer and use it in GitHub Desktop.
Save QQGoblin/ecb5975bf0fbd38cb1d4ece662e8b696 to your computer and use it in GitHub Desktop.
【Patroni源码阅读】Kubernetes Client
class ApiClient(object):
_API_URL_PREFIX = '/api/v1/namespaces/'
def __init__(self, bypass_api_service=False):
self._bypass_api_service = bypass_api_service
self.pool_manager = urllib3.PoolManager(**k8s_config.pool_config)
self._base_uri = k8s_config.server
self._api_servers_cache = [k8s_config.server]
self._api_servers_cache_updated = 0
self.set_api_servers_cache_ttl(10)
self.set_read_timeout(10)
try:
self._load_api_servers_cache()
except K8sException:
pass
def set_read_timeout(self, timeout):
self._read_timeout = timeout
def set_api_servers_cache_ttl(self, ttl):
self._api_servers_cache_ttl = ttl - 0.5
def set_base_uri(self, value):
logger.info('Selected new K8s API server endpoint %s', value)
# We will connect by IP of the master node which is not listed as alternative name
self.pool_manager.connection_pool_kw['assert_hostname'] = False
self._base_uri = value
@staticmethod
def _handle_server_response(response, _preload_content):
if response.status not in range(200, 206):
raise k8s_client.rest.ApiException(http_resp=response)
return K8sObject(json.loads(response.data.decode('utf-8'))) if _preload_content else response
@staticmethod
def _make_headers(headers):
ret = k8s_config.headers
ret.update(headers or {})
return ret
@property
def api_servers_cache(self):
base_uri, cache = self._base_uri, self._api_servers_cache
return ([base_uri] if base_uri in cache else []) + [machine for machine in cache if machine != base_uri]
def _get_api_servers(self, api_servers_cache):
_, per_node_timeout, per_node_retries = self._calculate_timeouts(len(api_servers_cache))
kwargs = {'headers': self._make_headers({}), 'preload_content': True, 'retries': per_node_retries,
'timeout': urllib3.Timeout(connect=max(1, per_node_timeout/2.0), total=per_node_timeout)}
path = self._API_URL_PREFIX + 'default/endpoints/kubernetes'
for base_uri in api_servers_cache:
try:
response = self.pool_manager.request('GET', base_uri + path, **kwargs)
endpoint = self._handle_server_response(response, True)
for subset in endpoint.subsets:
for port in subset.ports:
if port.name == 'https' and port.protocol == 'TCP':
addresses = [uri('https', (a.ip, port.port)) for a in subset.addresses]
if addresses:
random.shuffle(addresses)
return addresses
except Exception as e:
if isinstance(e, k8s_client.rest.ApiException) and e.status == 403:
raise
self.pool_manager.clear()
logger.error('Failed to get "kubernetes" endpoint from %s: %r', base_uri, e)
raise K8sConnectionFailed('No more K8s API server nodes in the cluster')
def _refresh_api_servers_cache(self, updating_cache=False):
if self._bypass_api_service:
try:
api_servers_cache = [k8s_config.server] if updating_cache else self.api_servers_cache
self._api_servers_cache = self._get_api_servers(api_servers_cache)
if updating_cache:
self.pool_manager.clear()
except k8s_client.rest.ApiException: # 403 Permission denied
logger.warning("Kubernetes RBAC doesn't allow GET access to the 'kubernetes' "
"endpoint in the 'default' namespace. Disabling 'bypass_api_service'.")
self._bypass_api_service = False
self._api_servers_cache = [k8s_config.server]
if not updating_cache:
self.pool_manager.clear()
except K8sConnectionFailed:
if updating_cache:
raise K8sException("Could not get the list of K8s API server nodes")
return
else:
self._api_servers_cache = [k8s_config.server]
if self._base_uri not in self._api_servers_cache:
self.set_base_uri(self._api_servers_cache[0])
self._api_servers_cache_updated = time.time()
def refresh_api_servers_cache(self):
if self._bypass_api_service and time.time() - self._api_servers_cache_updated > self._api_servers_cache_ttl:
self._refresh_api_servers_cache()
def _load_api_servers_cache(self):
self._update_api_servers_cache = True
self._refresh_api_servers_cache(True)
self._update_api_servers_cache = False
def _calculate_timeouts(self, api_servers, timeout=None):
"""Calculate a request timeout and number of retries per single K8s API server node.
In case if the timeout per node is too small (less than one second) we will reduce the number of nodes.
For the cluster with only one API server node we will try to do 1 retry.
No retries for clusters with 2 or more API server nodes. We better rely on switching to a different node."""
per_node_timeout = timeout = float(timeout or self._read_timeout)
max_retries = 3 - min(api_servers, 2)
per_node_retries = 1
min_timeout = 1.0
while api_servers > 0:
per_node_timeout = float(timeout) / api_servers
if per_node_timeout >= min_timeout:
# for small clusters we will try to do more than one try on every node
while per_node_retries < max_retries and per_node_timeout / (per_node_retries + 1) >= min_timeout:
per_node_retries += 1
per_node_timeout /= per_node_retries
break
# if the timeout per one node is to small try to reduce number of nodes
api_servers -= 1
max_retries = 1
return api_servers, per_node_timeout, per_node_retries - 1
def _do_http_request(self, retry, api_servers_cache, method, path, **kwargs):
some_request_failed = False
for i, base_uri in enumerate(api_servers_cache):
if i > 0:
logger.info('Retrying on %s', base_uri)
try:
response = self.pool_manager.request(method, base_uri + path, **kwargs)
if some_request_failed:
self.set_base_uri(base_uri)
self._refresh_api_servers_cache()
return response
except (HTTPError, HTTPException, socket.error, socket.timeout) as e:
self.pool_manager.clear()
if not retry:
# switch to the next node if request failed and retry is not allowed
if i + 1 < len(api_servers_cache):
self.set_base_uri(api_servers_cache[i + 1])
raise K8sException('{0} {1} request failed'.format(method, path))
logger.error('Request to server %s failed: %r', base_uri, e)
some_request_failed = True
raise K8sConnectionFailed('No more API server nodes in the cluster')
def request(self, retry, method, path, timeout=None, **kwargs):
if self._update_api_servers_cache:
self._load_api_servers_cache()
api_servers_cache = self.api_servers_cache
api_servers = len(api_servers_cache)
if timeout:
if isinstance(timeout, six.integer_types + (float,)):
timeout = urllib3.Timeout(total=timeout)
elif isinstance(timeout, tuple) and len(timeout) == 2:
timeout = urllib3.Timeout(connect=timeout[0], read=timeout[1])
retries = 0
else:
_, timeout, retries = self._calculate_timeouts(api_servers)
timeout = urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout)
kwargs.update(retries=retries, timeout=timeout)
while True:
try:
return self._do_http_request(retry, api_servers_cache, method, path, **kwargs)
except K8sConnectionFailed as ex:
try:
self._load_api_servers_cache()
api_servers_cache = self.api_servers_cache
api_servers = len(api_servers_cache)
except Exception as e:
logger.debug('Failed to update list of K8s master nodes: %r', e)
sleeptime = retry.sleeptime
remaining_time = (retry.stoptime or time.time()) - sleeptime - time.time()
nodes, timeout, retries = self._calculate_timeouts(api_servers, remaining_time)
if nodes == 0:
self._update_api_servers_cache = True
raise ex
retry.sleep_func(sleeptime)
retry.update_delay()
# We still have some time left. Partially reduce `api_servers_cache` and retry request
kwargs.update(timeout=urllib3.Timeout(connect=max(1, timeout/2.0), total=timeout), retries=retries)
api_servers_cache = api_servers_cache[:nodes]
def call_api(self, method, path, headers=None, body=None, _retry=None,
_preload_content=True, _request_timeout=None, **kwargs):
headers = self._make_headers(headers)
fields = {to_camel_case(k): v for k, v in kwargs.items()} # resource_version => resourceVersion
body = json.dumps(body, default=lambda o: o.to_dict()) if body is not None else None
# 用户发送Restful时可以额外传递其他参数,这些参数都作为fields字典被传入urllib3中被作为HTTP请求Params参数
# 比较常用Params的包括:watch、resource_version等等
# 参考:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.19/
response = self.request(_retry, method, self._API_URL_PREFIX + path, headers=headers, fields=fields,
body=body, preload_content=_preload_content, timeout=_request_timeout)
return self._handle_server_response(response, _preload_content)
class CoreV1Api(object):
def __init__(self, api_client=None):
self._api_client = api_client or k8s_client.ApiClient()
def __getattr__(self, func): # `func` name pattern: (action)_namespaced_(kind)
action, kind = func.split('_namespaced_') # (read|list|create|patch|replace|delete|delete_collection)
kind = kind.replace('_', '') + ('s' * int(kind[-1] != 's')) # plural, single word
def wrapper(*args, **kwargs):
method = {'read': 'GET', 'list': 'GET', 'create': 'POST',
'replace': 'PUT'}.get(action, action.split('_')[0]).upper()
if action == 'create' or len(args) == 1: # namespace is a first argument and name in not in arguments
path = '/'.join([args[0], kind])
else: # name, namespace followed by optional body
path = '/'.join([args[1], kind, args[0]])
headers = {'Content-Type': 'application/strategic-merge-patch+json'} if action == 'patch' else {}
if len(args) == 3: # name, namespace, body
body = args[2]
elif action == 'create': # namespace, body
body = args[1]
elif action == 'delete': # name, namespace
body = kwargs.pop('body', None)
else:
body = None
return self._api_client.call_api(method, path, headers, body, **kwargs)
return wrapper
class CoreV1ApiProxy(object):
def __init__(self, use_endpoints=False, bypass_api_service=False):
# ApiClient封装了Kube-APIServer的地址信息,以及urllib3的连接池。
# call_api方法封装了向Kube-APIServer发出请求的逻辑
self._api_client = k8s_client.ApiClient(bypass_api_service)
# CoreV1Api是通过闭包方式实现的API客户端,Patroni重写了__getattr__方法,当用户请求coreV1Api.(action)_namespaced_(kind)返回一个RestAPI接口,该接口可以用来获取Kubernetes上指定类型的对象。
# 示例:
# pods_func = functools.partial(self._api.list_namespaced_pod, "default",label_selector={"application":"postgresql"})
# 上述调用中,pods_func是一个函数它返回default命名空间下,所有带application=postgresql的Pod信息
self._core_v1_api = k8s_client.CoreV1Api(self._api_client)
self._use_endpoints = bool(use_endpoints)
def configure_timeouts(self, loop_wait, retry_timeout, ttl):
# Normally every loop_wait seconds we should have receive something from the socket.
# If we didn't received anything after the loop_wait + retry_timeout it is a time
# to start worrying (send keepalive messages). Finally, the connection should be
# considered as dead if we received nothing from the socket after the ttl seconds.
self._api_client.pool_manager.connection_pool_kw['socket_options'] = \
list(keepalive_socket_options(ttl, int(loop_wait + retry_timeout)))
self._api_client.set_read_timeout(retry_timeout)
self._api_client.set_api_servers_cache_ttl(loop_wait)
def refresh_api_servers_cache(self):
self._api_client.refresh_api_servers_cache()
def __getattr__(self, func):
# CoreV1ApiProxy 同样重载了__getattr__方法,主要目的是为了兼容_kind表示的是endpoints还是config_map
if func.endswith('_kind'):
func = func[:-4] + ('endpoints' if self._use_endpoints else 'config_map')
def wrapper(*args, **kwargs):
try:
return getattr(self._core_v1_api, func)(*args, **kwargs)
except k8s_client.rest.ApiException as e:
if e.status in (500, 503, 504) or e.headers and 'retry-after' in e.headers: # XXX
raise KubernetesRetriableException(e)
raise
return wrapper
@property
def use_endpoints(self):
return self._use_endpoints
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment