Skip to content

Instantly share code, notes, and snippets.

@christippett
Created September 13, 2018 05:16
Show Gist options
  • Save christippett/50b73d9fa94d6d5e5500c258c22c757a to your computer and use it in GitHub Desktop.
Save christippett/50b73d9fa94d6d5e5500c258c22c757a to your computer and use it in GitHub Desktop.
Generate IAP token using Google service account
import google.auth
from google.auth.compute_engine.credentials import \
Credentials as ComputeEngineCredentials
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials as OAuth2Credentials
from google.oauth2.service_account import Credentials, IDTokenCredentials
IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
class IAPTokenGenerator:
def __init__(self, client_id, service_account_file=None):
if service_account_file is not None:
creds = Credentials.from_service_account_file(service_account_file)
else:
creds, _ = google.auth.default(scopes=[IAM_SCOPE])
self.credentials = self._get_id_token_credentials(creds, client_id)
def _get_id_token_credentials(self, bootstrap_credentials, client_id):
if isinstance(bootstrap_credentials, OAuth2Credentials):
raise Exception('Only service accounts supported.')
# For service account's using the Compute Engine metadata service,
# service_account_email isn't available until refresh is called.
bootstrap_credentials.refresh(Request())
signer_email = bootstrap_credentials.service_account_email
if isinstance(bootstrap_credentials, ComputeEngineCredentials):
# Since the Compute Engine metadata service doesn't expose the
# service account key, we use the IAM signBlob API to sign
# instead. In order for this to work:
# 1. Your VM needs the https://www.googleapis.com/auth/iam
# scope.
# 2. The VM's default service account needs the "Service
# Account Actor" role.
signer = google.auth.iam.Signer(
Request(), bootstrap_credentials, signer_email)
else:
signer = bootstrap_credentials.signer
return IDTokenCredentials(
signer=signer,
service_account_email=signer_email,
token_uri=OAUTH_TOKEN_URI,
target_audience=client_id)
def get_token(self):
self.credentials.refresh(Request())
return self.credentials.token
def get_request_header(self):
token = self.get_token()
return {'Authorization': 'Bearer {}'.format(token)}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment