Skip to content

Instantly share code, notes, and snippets.

@rkhullar
Last active April 4, 2023 14:30
Show Gist options
  • Save rkhullar/9d56f3711e8026ca22a336c92a22ad4d to your computer and use it in GitHub Desktop.
Save rkhullar/9d56f3711e8026ca22a336c92a22ad4d to your computer and use it in GitHub Desktop.
custom python kubernetes client
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from httpx_util import BearerAuth, async_httpx
# https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/CoreV1Api.md
# https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/BatchV1Api.md
@dataclass(frozen=True)
class KubernetesClient:
host: str
namespace: str
api_token: str = field(repr=False)
async def list_pods(self) -> AsyncIterator[str]:
url = f'https://{self.host}/api/v1/namespaces/{self.namespace}/pods'
response = await async_httpx(method='get', url=url, auth=BearerAuth(self.api_token), verify=False)
response.raise_for_status()
response_data = response.json()
for item in response_data['items']:
yield item['metadata']['name']
if __name__ == '__main__':
import asyncio
import os
kubernetes_host: str = os.environ['KUBERNETES_HOST']
kubernetes_namespace: str = os.environ['KUBERNETES_NAMESPACE']
kubernetes_token: str = os.environ['KUBERNETES_TOKEN']
k8s_client = KubernetesClient(host=kubernetes_host, namespace=kubernetes_namespace, api_token=kubernetes_token)
async def test_list_pods():
async for name in k8s_client.list_pods():
print(name)
asyncio.run(test_list_pods())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment