-
-
Save rarescosma/03a50a9695ef420f23faad12bbb990f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Use a GCP service account to assume an web-identity AWS IAM role. | |
The "happy" flow is: | |
1. create a signed JWT token using the GCP service account credentials | |
2. exchange the token from (1) for a Google-signed OAuth token | |
3. call boto's `assume_role_with_web_identity` to exchange the token from (2) for temporary AWS | |
credentials. | |
""" | |
import sys | |
import time | |
from dataclasses import dataclass | |
from typing import Callable, Optional, TypedDict | |
import boto3 | |
import click | |
import requests | |
from botocore.exceptions import ClientError as BotoError | |
from google.auth import default, jwt | |
from google.auth.credentials import Credentials | |
from google.auth.exceptions import GoogleAuthError | |
from google.auth.iam import Signer | |
from google.auth.transport.requests import Request | |
JWT_BEARER_TOKEN_EXPIRY_TIME: int = 3600 | |
JWT_BEARER_TOKEN_GRANT_TYPE: str = "urn:ietf:params:oauth:grant-type:jwt-bearer" | |
OAUTH_TOKEN_URI: str = "https://www.googleapis.com/oauth2/v4/token" | |
class JwtPayload(TypedDict): | |
iss: str # claim issuer | |
aud: str # claim audience | |
target_audience: str | |
iat: int # claim issued at timestamp | |
exp: int # claim expiration timestamp | |
@dataclass(frozen=True) | |
class BotoCredentials: | |
access_key_id: str | |
secret_access_key: str | |
session_token: str | |
def as_export_block(self) -> str: | |
return ( | |
f"export AWS_ACCESS_KEY_ID={self.access_key_id}\n" | |
f"export AWS_SECRET_ACCESS_KEY={self.secret_access_key}\n" | |
f"export AWS_SESSION_TOKEN={self.session_token}\n" | |
) | |
def _get_default_credentials() -> Optional[Credentials]: | |
credentials, _ = default( | |
scopes=[ | |
"https://www.googleapis.com/auth/userinfo.email", | |
"https://www.googleapis.com/auth/iam", | |
] | |
) | |
try: | |
credentials.refresh(Request()) | |
except GoogleAuthError as e: | |
print(f"Failed to refresh credentials: {e}", file=sys.stderr) | |
return None | |
return credentials | |
def _get_signed_jwt( | |
service_account: str, | |
target_audience: str, | |
credential_provider: Callable[ | |
..., Optional[Credentials] | |
] = _get_default_credentials, | |
) -> Optional[bytes]: | |
if (credentials := credential_provider()) is None: | |
return None | |
now = int(time.time()) | |
payload: JwtPayload = { | |
"iss": service_account, | |
"aud": OAUTH_TOKEN_URI, | |
"target_audience": target_audience, | |
"iat": now, | |
"exp": now + JWT_BEARER_TOKEN_EXPIRY_TIME, | |
} | |
signer = Signer(Request(), credentials, service_account) | |
try: | |
return jwt.encode(signer, dict(payload)) | |
except GoogleAuthError as e: | |
print(f"Failed to produce a signed JWT token: {e}", file=sys.stderr) | |
return None | |
def _get_oauth_token( | |
signed_jwt: bytes, | |
requester: Callable[..., requests.Response] = requests.post, | |
) -> Optional[str]: | |
data = {"grant_type": JWT_BEARER_TOKEN_GRANT_TYPE, "assertion": signed_jwt} | |
response = requester(OAUTH_TOKEN_URI, data=data) | |
try: | |
response.raise_for_status() | |
result_dict = response.json() | |
return result_dict.get("id_token") | |
except requests.exceptions.RequestException as e: | |
print(f"Failed to exchange JWT for oauth token: {e}", file=sys.stderr) | |
return None | |
def _get_boto_credentials( | |
role_arn: str, session_name: str, token: str | |
) -> Optional[BotoCredentials]: | |
client = boto3.client("sts") | |
try: | |
response = client.assume_role_with_web_identity( | |
RoleArn=role_arn, | |
RoleSessionName=session_name, | |
WebIdentityToken=token, | |
) | |
except BotoError as e: | |
print(f"Failed to assume AWS role: {e}", file=sys.stderr) | |
return None | |
return BotoCredentials( | |
access_key_id=response["Credentials"]["AccessKeyId"], | |
secret_access_key=response["Credentials"]["SecretAccessKey"], | |
session_token=response["Credentials"]["SessionToken"], | |
) | |
@click.command | |
@click.option( | |
"--gcp-service-account", | |
envvar="GCP_SERVICE_ACCOUNT", | |
required=True, | |
help="The GCP service account to use for federation.", | |
type=str, | |
) | |
@click.option( | |
"--target-audience-url", | |
envvar="TARGET_AUDIENCE_URL", | |
required=True, | |
help="The target audience URL, same as the one configured in the AWS IAM role trust policy.", | |
type=str, | |
) | |
@click.option( | |
"--aws-iam-role-arn", | |
envvar="AWS_IAM_ROLE_ARN", | |
required=True, | |
help="The AWS IAM role to assume.", | |
type=str, | |
) | |
@click.option( | |
"--aws-session-name", | |
envvar="AWS_SESSION_NAME", | |
required=True, | |
help="An unique name for the assumed role session.", | |
type=str, | |
) | |
def federate_gcp_to_aws( | |
gcp_service_account: str, | |
target_audience_url: str, | |
aws_iam_role_arn: str, | |
aws_session_name: str, | |
) -> None: | |
""" | |
Use a GCP service account to obtain ephemeral AWS credentials for the given IAM role. | |
""" | |
if ( | |
_signed_jwt := _get_signed_jwt( | |
gcp_service_account, | |
target_audience=target_audience_url, | |
) | |
) is None: | |
sys.exit(1) | |
if (_token := _get_oauth_token(_signed_jwt)) is None: | |
sys.exit(1) | |
if ( | |
_boto_credentials := _get_boto_credentials( | |
role_arn=aws_iam_role_arn, | |
session_name=aws_session_name, | |
token=_token, | |
) | |
) is None: | |
sys.exit(1) | |
print(_boto_credentials.as_export_block()) | |
if __name__ == "__main__": | |
federate_gcp_to_aws() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment