Skip to content

Instantly share code, notes, and snippets.

@bribroder
Created October 20, 2023 15:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bribroder/5306d5245f2b66823568f7347b63eb7f to your computer and use it in GitHub Desktop.
Save bribroder/5306d5245f2b66823568f7347b63eb7f to your computer and use it in GitHub Desktop.
from botocore import session
from datetime import datetime, timedelta
import base64
import boto3
import kubernetes
import tempfile
import typing
URL_TIMEOUT = 60
TOKEN_EXPIRATION_MINS = 14
TOKEN_PREFIX = 'k8s-aws-v1.'
CLUSTER_NAME_HEADER = 'x-k8s-aws-id'
def _retrieve_cluster_name(params, context, **kwargs):
if 'ClusterName' in params:
context['eks_cluster'] = params.pop('ClusterName')
def _inject_cluster_name_header(request, **kwargs):
if 'eks_cluster' in request.context:
request.headers[CLUSTER_NAME_HEADER] = request.context['eks_cluster']
def get_sts_client(sts_session=None, region_name=None, role_arn=None):
if sts_session is None:
sts_session = session.get_session()
client_kwargs = {
'region_name': region_name
}
if role_arn is not None:
sts = sts_session.create_client('sts', region_name)
creds = sts.assume_role(
RoleArn=role_arn,
RoleSessionName='EKSGetTokenAuth'
)['Credentials']
client_kwargs['aws_access_key_id'] = creds['AccessKeyId']
client_kwargs['aws_secret_access_key'] = creds['SecretAccessKey']
client_kwargs['aws_session_token'] = creds['SessionToken']
sts = sts_session.create_client('sts', **client_kwargs)
sts.meta.events.register(
'provide-client-params.sts.GetCallerIdentity',
_retrieve_cluster_name
)
sts.meta.events.register(
'before-sign.sts.GetCallerIdentity',
_inject_cluster_name_header
)
return sts
class TokenGenerator(object):
def __init__(self, sts_client):
self._sts_client = sts_client
def get_token(self, cluster_name):
""" Generate a presigned url token to pass to kubectl. """
url = self._get_presigned_url(cluster_name)
token = TOKEN_PREFIX + base64.urlsafe_b64encode(
url.encode('utf-8')).decode('utf-8').rstrip('=')
return token
def _get_presigned_url(self, cluster_name):
return self._sts_client.generate_presigned_url(
'get_caller_identity',
Params={'ClusterName': cluster_name},
ExpiresIn=URL_TIMEOUT,
HttpMethod='GET',
)
def get_token(sts_client, cluster_name: str, region_name: str = None, role_arn: str = None, full: bool = True) -> dict:
token = TokenGenerator(sts_client).get_token(cluster_name)
if full is True:
return {
"kind": "ExecCredential",
"apiVersion": "client.authentication.k8s.io/v1alpha1",
"spec": {},
"status": {
"token": token,
"expirationTimestamp": (
datetime.utcnow() + timedelta(minutes=TOKEN_EXPIRATION_MINS)
).strftime('%Y-%m-%dT%H:%M:%SZ')
}
}
else:
return token
def _write_cafile(data: str) -> tempfile.NamedTemporaryFile:
# protect yourself from automatic deletion
cafile = tempfile.NamedTemporaryFile(delete=False)
cadata_b64 = data
cadata = base64.b64decode(cadata_b64)
cafile.write(cadata)
cafile.flush()
return cafile
def k8s_api_client(
endpoint: str,
token: str,
cafile: str,
client_type: str = 'CoreV1Api'
) -> typing.Union[
kubernetes.client.CoreV1Api,
kubernetes.client.AppsV1Api,
kubernetes.client.BatchV1Api,
kubernetes.client.StorageV1Api,
kubernetes.client.RbacAuthorizationV1Api
]:
kconfig = kubernetes.config.kube_config.Configuration(
host=endpoint,
api_key={'authorization': f'Bearer {token}'}
)
kconfig.ssl_ca_cert = cafile
kclient = kubernetes.client.ApiClient(configuration=kconfig)
return getattr(kubernetes.client, client_type)(api_client=kclient)
def get_client(
cluster_name: str,
cluster_data: dict = None,
client_type: str = 'CoreV1Api'
) -> typing.Union[
kubernetes.client.CoreV1Api,
kubernetes.client.AppsV1Api,
kubernetes.client.BatchV1Api,
kubernetes.client.StorageV1Api,
kubernetes.client.RbacAuthorizationV1Api
]:
if cluster_data is None:
eks_client = boto3.client('eks')
cluster_data = eks_client.describe_cluster(name=cluster_name)['cluster']
# the kubernetes client requires the CA cert as a file
cafile = _write_cafile(cluster_data['certificateAuthority']['data'])
sts_client = get_sts_client()
token = get_token(sts_client, cluster_name, full = False)
return k8s_api_client(
endpoint=cluster_data['endpoint'],
token=token,
cafile=cafile.name,
client_type=client_type
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment