Created
October 20, 2023 15:15
-
-
Save bribroder/5306d5245f2b66823568f7347b63eb7f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from botocore import session | |
from datetime import datetime, timedelta | |
import base64 | |
import boto3 | |
import kubernetes | |
import tempfile | |
import typing | |
URL_TIMEOUT = 60 | |
TOKEN_EXPIRATION_MINS = 14 | |
TOKEN_PREFIX = 'k8s-aws-v1.' | |
CLUSTER_NAME_HEADER = 'x-k8s-aws-id' | |
def _retrieve_cluster_name(params, context, **kwargs): | |
if 'ClusterName' in params: | |
context['eks_cluster'] = params.pop('ClusterName') | |
def _inject_cluster_name_header(request, **kwargs): | |
if 'eks_cluster' in request.context: | |
request.headers[CLUSTER_NAME_HEADER] = request.context['eks_cluster'] | |
def get_sts_client(sts_session=None, region_name=None, role_arn=None): | |
if sts_session is None: | |
sts_session = session.get_session() | |
client_kwargs = { | |
'region_name': region_name | |
} | |
if role_arn is not None: | |
sts = sts_session.create_client('sts', region_name) | |
creds = sts.assume_role( | |
RoleArn=role_arn, | |
RoleSessionName='EKSGetTokenAuth' | |
)['Credentials'] | |
client_kwargs['aws_access_key_id'] = creds['AccessKeyId'] | |
client_kwargs['aws_secret_access_key'] = creds['SecretAccessKey'] | |
client_kwargs['aws_session_token'] = creds['SessionToken'] | |
sts = sts_session.create_client('sts', **client_kwargs) | |
sts.meta.events.register( | |
'provide-client-params.sts.GetCallerIdentity', | |
_retrieve_cluster_name | |
) | |
sts.meta.events.register( | |
'before-sign.sts.GetCallerIdentity', | |
_inject_cluster_name_header | |
) | |
return sts | |
class TokenGenerator(object): | |
def __init__(self, sts_client): | |
self._sts_client = sts_client | |
def get_token(self, cluster_name): | |
""" Generate a presigned url token to pass to kubectl. """ | |
url = self._get_presigned_url(cluster_name) | |
token = TOKEN_PREFIX + base64.urlsafe_b64encode( | |
url.encode('utf-8')).decode('utf-8').rstrip('=') | |
return token | |
def _get_presigned_url(self, cluster_name): | |
return self._sts_client.generate_presigned_url( | |
'get_caller_identity', | |
Params={'ClusterName': cluster_name}, | |
ExpiresIn=URL_TIMEOUT, | |
HttpMethod='GET', | |
) | |
def get_token(sts_client, cluster_name: str, region_name: str = None, role_arn: str = None, full: bool = True) -> dict: | |
token = TokenGenerator(sts_client).get_token(cluster_name) | |
if full is True: | |
return { | |
"kind": "ExecCredential", | |
"apiVersion": "client.authentication.k8s.io/v1alpha1", | |
"spec": {}, | |
"status": { | |
"token": token, | |
"expirationTimestamp": ( | |
datetime.utcnow() + timedelta(minutes=TOKEN_EXPIRATION_MINS) | |
).strftime('%Y-%m-%dT%H:%M:%SZ') | |
} | |
} | |
else: | |
return token | |
def _write_cafile(data: str) -> tempfile.NamedTemporaryFile: | |
# protect yourself from automatic deletion | |
cafile = tempfile.NamedTemporaryFile(delete=False) | |
cadata_b64 = data | |
cadata = base64.b64decode(cadata_b64) | |
cafile.write(cadata) | |
cafile.flush() | |
return cafile | |
def k8s_api_client( | |
endpoint: str, | |
token: str, | |
cafile: str, | |
client_type: str = 'CoreV1Api' | |
) -> typing.Union[ | |
kubernetes.client.CoreV1Api, | |
kubernetes.client.AppsV1Api, | |
kubernetes.client.BatchV1Api, | |
kubernetes.client.StorageV1Api, | |
kubernetes.client.RbacAuthorizationV1Api | |
]: | |
kconfig = kubernetes.config.kube_config.Configuration( | |
host=endpoint, | |
api_key={'authorization': f'Bearer {token}'} | |
) | |
kconfig.ssl_ca_cert = cafile | |
kclient = kubernetes.client.ApiClient(configuration=kconfig) | |
return getattr(kubernetes.client, client_type)(api_client=kclient) | |
def get_client( | |
cluster_name: str, | |
cluster_data: dict = None, | |
client_type: str = 'CoreV1Api' | |
) -> typing.Union[ | |
kubernetes.client.CoreV1Api, | |
kubernetes.client.AppsV1Api, | |
kubernetes.client.BatchV1Api, | |
kubernetes.client.StorageV1Api, | |
kubernetes.client.RbacAuthorizationV1Api | |
]: | |
if cluster_data is None: | |
eks_client = boto3.client('eks') | |
cluster_data = eks_client.describe_cluster(name=cluster_name)['cluster'] | |
# the kubernetes client requires the CA cert as a file | |
cafile = _write_cafile(cluster_data['certificateAuthority']['data']) | |
sts_client = get_sts_client() | |
token = get_token(sts_client, cluster_name, full = False) | |
return k8s_api_client( | |
endpoint=cluster_data['endpoint'], | |
token=token, | |
cafile=cafile.name, | |
client_type=client_type | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment