Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Google Auth Utils for Directory API
import subprocess
import warnings
from typing import List, Optional
import google.auth
import google.auth.iam
import google.oauth2.credentials
import structlog
from google.auth.credentials import Credentials
from google.auth.transport import requests
from google.oauth2 import service_account
from pydantic import EmailStr, HttpUrl
TOKEN_URI = "https://accounts.google.com/o/oauth2/token"
CLOUD_PLATFORM_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
logger = structlog.get_logger(__name__)
class ConfigurationError(Exception):
pass
# These are couple of quite magic functions that try to do the right thing.
# In the ideal GCP world we would've just have one line
# creds = google.auth.default()[0].with_subject(subject).with_scopes(scopes)
# to obtain scoped creds for directory API, but it only works when using
# service account credentials from JSON file as listed in GOOGLE_APPLICATION_CREDENTIALS
# env var.
#
# However we have different things to do between dev and prod:
#
# In dev we have Gcloud SDK app default credentials that we want to impersonate to our dev
# service account, because you can only access directory API through service account :|
#
# Impersonalization can set required scopes, but not a subject (at least not through a public Python API).
#
# In CI (e.g. GitHub Actions) we'll use svcacc key-file for our CI svc acc to obtain initial credentials and then
# impersonate to the target service account to run actual functional tests that access Directory API.
#
# Finally in prod, we already run under the target service account (both in GCE and Cloud Run), but
# scopes are fixed to "cloud-platform" in GCE, and while google.auth.default(scopes[...]) works in Clour Run
# (but, again, not in GCE/GKE: https://medium.com/@guillaume.blaquiere/yes-my-experience-isnt-the-same-63ac3c783864),
# we still need credentials with a different subject (Directory Admin email), but
# google.auth.compute_engine.credentials.Credentials, which what returned as ADC in GCE/GKE/CloudRun,
# doesn't have an API to change subject.
#
# Hence we can't access we can't access Directory API with ADC-derived credentials unless the only come from JSON key file.
#
# Hence, to obtain target svcacc credentials with the right scopes and different subject, we require the target *service account*
# to have iam.serviceAccounts.getAccessToken permission to *itself*
# (e.g. through roles/iam.serviceAccountTokenCreator role)
#
# Since in dev path, google.auth.impersonated_credentials are not of a much use (can't set subject),
# we skip it all together. So all we need is:
#
# A). Determine the target service account email which either comes from ADC in prod or
# from local settings in dev/CI.
# B). Issue new service account credentials for this account with the right subject and scope. In prod, the svc acc
# needs permissions to create tokens for itself; in dev, an authenticated user needs to have permissions to create
# tokens for the target service account in question.
def build_delegated_credentials(
target_principal: EmailStr,
scopes: List[HttpUrl],
impersonate_service_account: Optional[EmailStr] = None,
) -> Credentials:
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", "Your application has authenticated using end user credentials from Google Cloud SDK."
)
# Ensuring we have cloud-platform scope in order sign blob.
# E.g. creds coming from service acc key file are scopeless.
creds, _ = google.auth.default(scopes=CLOUD_PLATFORM_SCOPES)
logger.info("Obtained default credentials")
# OK, we got default creds. But they can have one of the 3 not-so-compatible faces:
# - User credentials obtained through "gcloud auth application-default login" (which is NOT
# the same as "gcloud auth login" - the latter only used by gcloud). This creds are instance of
# google.oauth2.credentials.Credentials, obviously do not have service_account_email property
# and are intended for further impersonation under a target service account provided through
# settings. Again this is development workflow.
# - Service account credentials obtained from a credentials JSON file when using
# GOOGLE_APPLICATION_CREDENTIALS file. This flow is only used by CI when running tests in
# GitHub Actions (i.e. not on GCP). Again, DO NOT USE GOOGLE_APPLICATION_CREDENTIALS
# for local development but rather grant your user Token Creator role on your dev
# service account. These credentials are an instance of
# google.oauth2.service_account.Credentials, already have service_account_email property
# populated (since it's part of key json file) which we can use unless impersonation required.
# - Service account credential obtained when running on GCE VM / GKE pod / Cloud Run
# (under the target service account). These are an instance of
# google.auth.compute_engine.credentials.Credentials, SHOULD have service_account_email, but it
# just says "default" until we refresh them. After refreshing, we are good to go, unless
# for some wierd reason impersonation is requested as well.
svcacc: EmailStr = None
if isinstance(creds, google.oauth2.credentials.Credentials):
if not impersonate_service_account:
raise Exception(
"""
Using Cloud SDK Applicaton Default credentials, but couldn't find
a service account to impersonate into. Since you are probably in dev
environment, you MUST pass impersonate_service_account argument to this function
"""
)
elif isinstance(creds, google.oauth2.service_account.Credentials):
svcacc = creds.service_account_email
elif isinstance(creds, google.auth.compute_engine.credentials.Credentials):
creds.refresh(requests.Request())
svcacc = creds.service_account_email
else:
raise Exception(f"Don't know how to handle {repr(creds)} credential type :(")
if impersonate_service_account:
svcacc = impersonate_service_account
logger.info("Using explicitly provided service account", svcacc=svcacc)
creds = delegate(creds, svcacc, target_principal, scopes)
return creds
# FROM: https://stackoverflow.com/questions/53202767/gae-attributeerror-credentials-object-has-no-attribute-with-subject/57092533#57092533 # noqa: E501
# Read the permissions required! (As well as the long note above) Thank you!
def delegate(
creds: Credentials,
svcacc: EmailStr,
subject: EmailStr,
scopes: List[HttpUrl],
) -> service_account.Credentials:
logger.info(
"Reissuing credentials with new scopes and subject",
svcacc=svcacc,
subject=subject,
scopes=[str(s) for s in scopes],
)
request = requests.Request()
signer = google.auth.iam.Signer(request, creds, svcacc)
return service_account.Credentials(signer, svcacc, TOKEN_URI, scopes=scopes, subject=subject)
# This is another complex stuff :(
def get_open_id_token(target_url: HttpUrl) -> str:
"""
Obtain Open ID token for the target URL (Cloud Run URL in our use case).
"""
logger.info("Obtaining identity token")
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", "Your application has authenticated using end user credentials from Google Cloud SDK."
)
# Ensuring we have cloud-platform scope in order sign block.
# E.g. creds coming from service acc key file are scopeless.
creds, _ = google.auth.default(scopes=CLOUD_PLATFORM_SCOPES)
logger.info("Obtained default credentials")
if isinstance(creds, google.oauth2.credentials.Credentials):
# We are in dev env. The simplest way to obtain identity token is to call gcloud :(
# The only other solution (except for copying parts of gcloud code) is to create another service
# account to use as Cloud Run invoker and impersonate to it since google.auth.default()
# credentials obtains identity tokens that are useless for Cloud Run (and Cloud Functions, etc.)
#
# https://medium.com/google-cloud/the-2-limits-of-iam-service-on-google-cloud-7db213277d9c#f007
#
# At least "spawn gcloud in the shell" approach, while unpretty code-wise, makes more sense
# to developers because this what they can do using shell/curl to debug issues themselves.
try:
token: str = subprocess.run(
["gcloud", "auth", "print-identity-token"],
text=True, # We want our token as str so text=True is paramount
check=True,
capture_output=True,
timeout=10,
).stdout.strip() # Mind the new line!
return token
except subprocess.CalledProcessError as err:
logger.error("Failed obintaining identity token from gcloud", error=err.stderr)
raise
elif isinstance(creds, google.oauth2.service_account.Credentials):
id_creds = service_account.IDTokenCredentials(
signer=creds.signer,
service_account_email=creds.service_account_email,
token_uri=TOKEN_URI,
target_audience=target_url,
)
id_creds.refresh(requests.Request())
return id_creds.token
else:
raise Exception(f"Don't know how to get identity token from {repr(creds)} credential type :(")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment