Skip to content

Instantly share code, notes, and snippets.

@salrashid123
Last active December 21, 2021 18:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save salrashid123/e44944de52ede6dff86dc40a484a06d2 to your computer and use it in GitHub Desktop.
Save salrashid123/e44944de52ede6dff86dc40a484a06d2 to your computer and use it in GitHub Desktop.
google-auth python. Impersonate and domain-delegate using impersonated_credentials
# snippet uses ADC credentials to impersonate generic-server@project.iam.gserviceaccount.com
# then use that server's credentials to create a token for user2 using domain delegation
# after that, the gcs and pubsub calls are done as if its user2
import google.auth
import time
from google.auth import credentials
from google.cloud import iam_credentials_v1
from google.auth import impersonated_credentials
import google.auth.iam
import base64
# see https://github.com/googleapis/google-auth-library-python/issues/931
from datetime import datetime, timedelta
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
import requests
class StaticCredentials(credentials.Credentials):
def __init__(
self,
token,
expires_in,
token_type,
):
super(credentials.Credentials, self).__init__()
self.token = token
if token == None:
raise exceptions.GoogleAuthError(
"Provided token cannot be null"
)
self.token_type = token_type
self.expiry = datetime.now() + timedelta(seconds=int(expires_in))
self._quota_project_id = None
@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
return
@property
def expired(self):
return _helpers.utcnow() >= self.expiry
@property
def quota_project_id(self):
"""Project to use for quota and billing purposes."""
return self._quota_project_id
# https://google-auth.readthedocs.io/en/master/reference/google.oauth2.service_account.html#domain-wide-delegation
project='project'
sa_1 = 'generic-server@project.iam.gserviceaccount.com'
## get the source credentials first (this is user_1)
source_credentials, project_id = google.auth.default()
# use that to get sa_1 credentials
target_scopes = ['https://www.googleapis.com/auth/cloud-platform']
sa1_credentials = impersonated_credentials.Credentials(
source_credentials = source_credentials,
target_principal=sa_1,
target_scopes = target_scopes,
delegates=[],
lifetime=500)
### use sa1_credentials to get an token for user2
sub = "user2@domain.com"
target_scopes_for_dwd = "https://www.googleapis.com/auth/cloud-platform https://www.googleapis.com/auth/admin.directory.user.readonly"
now = int(time.time())
exptime = now + 3600
claim =('{"iss":"%s",'
'"scope":"%s",'
'"aud":"https://accounts.google.com/o/oauth2/token",'
'"sub":"%s",'
'"exp":%s,'
'"iat":%s}') %(sa_1,target_scopes_for_dwd,sub,exptime,now)
# use the impersonated_credentials sign the JWT (since it has a signer)
# https://google-auth.readthedocs.io/en/master/reference/google.auth.impersonated_credentials.html#google.auth.impersonated_credentials.Credentials.signer
## is there anyway we can optionally return a tuple? i.,e the (signature, key_id)
## eg https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/signBlob#response-body
# {
# "keyId": string,
# "signedBlob": string
# }
#header = ('{"alg": "RS256", "kid": "%s", "typ": "JWT"}') %("24d9257dc9fe8959728b57655339b753d0f20f09")
header = ('{"alg": "RS256", "typ": "JWT"}')
header_claims = '{}.{}'.format(base64.urlsafe_b64encode(header.encode()).decode().rstrip('='),base64.urlsafe_b64encode(claim.encode()).decode().rstrip('='))
signed_bytes = sa1_credentials.sign_bytes(header_claims.encode())
assertion = header_claims + '.' + base64.urlsafe_b64encode(signed_bytes).decode().rstrip('=')
print('Signed JWT '+ assertion)
# now exchange the signed_jwt for an access_token representing the subject sub
url = 'https://accounts.google.com/o/oauth2/token'
data = {'grant_type' : 'assertion',
'assertion_type' : 'http://oauth.net/grant_type/jwt/1.0/bearer',
'assertion' : assertion }
headers = {"Content-type": "application/x-www-form-urlencoded"}
resp = requests.post(url, data)
## We finally have user2's token....
access_token=resp.json()['access_token']
expires_in=resp.json()['expires_in']
token_type=resp.json()['token_type']
## .use that as a static token here
## https://github.com/googleapis/google-auth-library-python/issues/931
sc = StaticCredentials(token=access_token,expires_in=expires_in,token_type=token_type)
from google.cloud import storage
client = storage.Client(project=project, credentials=sc)
for b in client.list_buckets():
print(b.name)
from google.cloud import pubsub_v1
from google.auth.transport.requests import AuthorizedSession
project_path = f"projects/{project}"
authed_session = AuthorizedSession(sc)
response = authed_session.request('GET', 'https://pubsub.googleapis.com/v1/{}/topics'.format(project_path))
print(response.json())
publisher = pubsub_v1.PublisherClient(credentials=sc)
for topic in publisher.list_topics(request={"project": project_path}):
print(topic.name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment