Skip to content

Instantly share code, notes, and snippets.

@rarescosma
Created April 17, 2024 09:29
Show Gist options
  • Save rarescosma/03a50a9695ef420f23faad12bbb990f8 to your computer and use it in GitHub Desktop.
Save rarescosma/03a50a9695ef420f23faad12bbb990f8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Use a GCP service account to assume an web-identity AWS IAM role.
The "happy" flow is:
1. create a signed JWT token using the GCP service account credentials
2. exchange the token from (1) for a Google-signed OAuth token
3. call boto's `assume_role_with_web_identity` to exchange the token from (2) for temporary AWS
credentials.
"""
import sys
import time
from dataclasses import dataclass
from typing import Callable, Optional, TypedDict
import boto3
import click
import requests
from botocore.exceptions import ClientError as BotoError
from google.auth import default, jwt
from google.auth.credentials import Credentials
from google.auth.exceptions import GoogleAuthError
from google.auth.iam import Signer
from google.auth.transport.requests import Request
JWT_BEARER_TOKEN_EXPIRY_TIME: int = 3600
JWT_BEARER_TOKEN_GRANT_TYPE: str = "urn:ietf:params:oauth:grant-type:jwt-bearer"
OAUTH_TOKEN_URI: str = "https://www.googleapis.com/oauth2/v4/token"
class JwtPayload(TypedDict):
iss: str # claim issuer
aud: str # claim audience
target_audience: str
iat: int # claim issued at timestamp
exp: int # claim expiration timestamp
@dataclass(frozen=True)
class BotoCredentials:
access_key_id: str
secret_access_key: str
session_token: str
def as_export_block(self) -> str:
return (
f"export AWS_ACCESS_KEY_ID={self.access_key_id}\n"
f"export AWS_SECRET_ACCESS_KEY={self.secret_access_key}\n"
f"export AWS_SESSION_TOKEN={self.session_token}\n"
)
def _get_default_credentials() -> Optional[Credentials]:
credentials, _ = default(
scopes=[
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/iam",
]
)
try:
credentials.refresh(Request())
except GoogleAuthError as e:
print(f"Failed to refresh credentials: {e}", file=sys.stderr)
return None
return credentials
def _get_signed_jwt(
service_account: str,
target_audience: str,
credential_provider: Callable[
..., Optional[Credentials]
] = _get_default_credentials,
) -> Optional[bytes]:
if (credentials := credential_provider()) is None:
return None
now = int(time.time())
payload: JwtPayload = {
"iss": service_account,
"aud": OAUTH_TOKEN_URI,
"target_audience": target_audience,
"iat": now,
"exp": now + JWT_BEARER_TOKEN_EXPIRY_TIME,
}
signer = Signer(Request(), credentials, service_account)
try:
return jwt.encode(signer, dict(payload))
except GoogleAuthError as e:
print(f"Failed to produce a signed JWT token: {e}", file=sys.stderr)
return None
def _get_oauth_token(
signed_jwt: bytes,
requester: Callable[..., requests.Response] = requests.post,
) -> Optional[str]:
data = {"grant_type": JWT_BEARER_TOKEN_GRANT_TYPE, "assertion": signed_jwt}
response = requester(OAUTH_TOKEN_URI, data=data)
try:
response.raise_for_status()
result_dict = response.json()
return result_dict.get("id_token")
except requests.exceptions.RequestException as e:
print(f"Failed to exchange JWT for oauth token: {e}", file=sys.stderr)
return None
def _get_boto_credentials(
role_arn: str, session_name: str, token: str
) -> Optional[BotoCredentials]:
client = boto3.client("sts")
try:
response = client.assume_role_with_web_identity(
RoleArn=role_arn,
RoleSessionName=session_name,
WebIdentityToken=token,
)
except BotoError as e:
print(f"Failed to assume AWS role: {e}", file=sys.stderr)
return None
return BotoCredentials(
access_key_id=response["Credentials"]["AccessKeyId"],
secret_access_key=response["Credentials"]["SecretAccessKey"],
session_token=response["Credentials"]["SessionToken"],
)
@click.command
@click.option(
"--gcp-service-account",
envvar="GCP_SERVICE_ACCOUNT",
required=True,
help="The GCP service account to use for federation.",
type=str,
)
@click.option(
"--target-audience-url",
envvar="TARGET_AUDIENCE_URL",
required=True,
help="The target audience URL, same as the one configured in the AWS IAM role trust policy.",
type=str,
)
@click.option(
"--aws-iam-role-arn",
envvar="AWS_IAM_ROLE_ARN",
required=True,
help="The AWS IAM role to assume.",
type=str,
)
@click.option(
"--aws-session-name",
envvar="AWS_SESSION_NAME",
required=True,
help="An unique name for the assumed role session.",
type=str,
)
def federate_gcp_to_aws(
gcp_service_account: str,
target_audience_url: str,
aws_iam_role_arn: str,
aws_session_name: str,
) -> None:
"""
Use a GCP service account to obtain ephemeral AWS credentials for the given IAM role.
"""
if (
_signed_jwt := _get_signed_jwt(
gcp_service_account,
target_audience=target_audience_url,
)
) is None:
sys.exit(1)
if (_token := _get_oauth_token(_signed_jwt)) is None:
sys.exit(1)
if (
_boto_credentials := _get_boto_credentials(
role_arn=aws_iam_role_arn,
session_name=aws_session_name,
token=_token,
)
) is None:
sys.exit(1)
print(_boto_credentials.as_export_block())
if __name__ == "__main__":
federate_gcp_to_aws()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment