Skip to content

Instantly share code, notes, and snippets.

@tsaarni
Last active June 6, 2024 11:17
Show Gist options
  • Save tsaarni/79b8b51610c948562cfe5d2e1002cae4 to your computer and use it in GitHub Desktop.
Save tsaarni/79b8b51610c948562cfe5d2e1002cae4 to your computer and use it in GitHub Desktop.
Testing local validation of Kubernetes tokens (Kubernetes as OIDC provider)
Fetching OIDC discovery endpoint from https://kubernetes.default.svc/.well-known/openid-configuration...
Fetching public keys from https://172.20.0.3:6443/openid/v1/jwks...
Validating the default service account token in the pod...
Token is valid!
Claims: {
"aud": [
"https://kubernetes.default.svc.cluster.local"
],
"exp": 1749207355,
"iat": 1717671355,
"iss": "https://kubernetes.default.svc.cluster.local",
"jti": "e51f1fbc-4733-4b85-a59f-ba80c0523874",
"kubernetes.io": {
"namespace": "default",
"node": {
"name": "contour-worker",
"uid": "517eb86e-b863-451b-bf70-86cb61439a3a"
},
"pod": {
"name": "shell",
"uid": "0d8b260e-ef0e-45fb-be8e-ff44f97722e8"
},
"serviceaccount": {
"name": "default",
"uid": "e6a8680e-d103-4cce-80ea-03acbbeacf7d"
},
"warnafter": 1717674962
},
"nbf": 1717671355,
"sub": "system:serviceaccount:default:default"
}
import requests
import json
from authlib.jose import jwt
TOKEN_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/token"
CA_CERT_PATH = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
OIDC_DISCOVERY_URL = "https://kubernetes.default.svc/.well-known/openid-configuration"
def get_service_account_token():
with open(TOKEN_PATH, 'r') as token_file:
return token_file.read().strip()
def get_kubernetes_public_keys():
# Use the pod's default service account token to authenticate with the Kubernetes API.
token = get_service_account_token()
headers = {
"Authorization": f"Bearer {token}",
}
print(f"Fetching OIDC discovery endpoint from {OIDC_DISCOVERY_URL}...")
response = requests.get(OIDC_DISCOVERY_URL, verify=CA_CERT_PATH, headers=headers)
response.raise_for_status()
openid_discovery = response.json()
jwks_uri = openid_discovery['jwks_uri']
print(f"Fetching public keys from {jwks_uri}...")
response = requests.get(jwks_uri, verify=CA_CERT_PATH, headers=headers)
response.raise_for_status()
return response.json()
def validate_token(token, public_keys):
claims = jwt.decode(token, public_keys)
claims.validate()
return claims
def main():
public_keys = get_kubernetes_public_keys()
print("Validating the default service account token in the pod...")
token = get_service_account_token()
claims = validate_token(token, public_keys)
if claims:
print("Token is valid!")
print("Claims:", json.dumps(claims, indent=4))
else:
print("Token is invalid!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment