Created
December 21, 2021 18:59
-
-
Save salrashid123/1f6e304a76ab5ddc9a800a5d85a0436d to your computer and use it in GitHub Desktop.
google-auth python. Impersonate and domain-delegate using iam_credentials_v1.IAMCredentialsClient
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# snippet uses ADC credentials to impersonate generic-server@project.iam.gserviceaccount.com | |
# then use that server's credentials to create a token for user2 using domain delegation | |
# after that, the gcs and pubsub calls are done as if its user2 | |
import google.auth | |
import time | |
from google.auth import credentials | |
from google.cloud import iam_credentials_v1 | |
from google.auth import impersonated_credentials | |
import google.auth.iam | |
import urllib.parse | |
## StaticCredentials should be in google.auth. | |
# sc = StaticCredentials(token=access_token,expires_in=expires_in,token_type=token_type) | |
# from google.cloud import storage | |
# client = storage.Client(project=project, credentials=sc) | |
# for b in client.list_buckets(): | |
# print(b.name) | |
from datetime import datetime, timedelta | |
from google.auth import _helpers | |
from google.auth import credentials | |
from google.auth import exceptions | |
import requests | |
class StaticCredentials(credentials.Credentials): | |
def __init__( | |
self, | |
token, | |
expires_in, | |
token_type, | |
): | |
super(credentials.Credentials, self).__init__() | |
self.token = token | |
if token == None: | |
raise exceptions.GoogleAuthError( | |
"Provided token cannot be null" | |
) | |
self.token_type = token_type | |
self.expiry = datetime.now() + timedelta(seconds=int(expires_in)) | |
self._quota_project_id = None | |
@_helpers.copy_docstring(credentials.Credentials) | |
def refresh(self, request): | |
return | |
@property | |
def expired(self): | |
return _helpers.utcnow() >= self.expiry | |
@property | |
def quota_project_id(self): | |
"""Project to use for quota and billing purposes.""" | |
return self._quota_project_id | |
## StaticCredentials should be in google.auth. | |
## get the source credentials first | |
source_credentials, project_id = google.auth.default() | |
project='fabled-ray-104117' | |
target_sa = 'generic-server@pubsub-msg.iam.gserviceaccount.com' | |
# generate a JWT | |
sub = "user2@esodemoapp2.com" | |
iss = target_sa | |
target_scopes_for_dwd = "https://www.googleapis.com/auth/cloud-platform https://www.googleapis.com/auth/admin.directory.user.readonly" | |
now = int(time.time()) | |
exptime = now + 3600 | |
claim =('{"iss":"%s",' | |
'"scope":"%s",' | |
'"aud":"https://accounts.google.com/o/oauth2/token",' | |
'"sub":"%s",' | |
'"exp":%s,' | |
'"iat":%s}') %(iss,target_scopes_for_dwd,sub,exptime,now) | |
# use the IAM api and the users credentials to sign the JWT | |
iam_client = iam_credentials_v1.IAMCredentialsClient(credentials=source_credentials) | |
name = iam_client.service_account_path('-', target_sa) | |
resp = iam_client.sign_jwt(name=name, delegates=None, payload=claim) | |
signed_jwt = resp.signed_jwt | |
print('Signed JWT '+ signed_jwt) | |
# now exchange the signed_jwt for an access_token representing the subject sub | |
url = 'https://accounts.google.com/o/oauth2/token' | |
data = {'grant_type' : 'assertion', | |
'assertion_type' : 'http://oauth.net/grant_type/jwt/1.0/bearer', | |
'assertion' : "{}".format(urllib.parse.quote(signed_jwt)) } | |
headers = {"Content-type": "application/x-www-form-urlencoded"} | |
resp = requests.post(url, data) | |
access_token=resp.json()['access_token'] | |
expires_in=resp.json()['expires_in'] | |
token_type=resp.json()['token_type'] | |
sc = StaticCredentials(token=access_token,expires_in=expires_in,token_type=token_type) | |
from google.cloud import storage | |
client = storage.Client(project=project, credentials=sc) | |
for b in client.list_buckets(): | |
print(b.name) | |
from google.cloud import pubsub_v1 | |
from google.auth.transport.requests import AuthorizedSession | |
project_path = f"projects/{project}" | |
authed_session = AuthorizedSession(sc) | |
response = authed_session.request('GET', 'https://pubsub.googleapis.com/v1/{}/topics'.format(project_path)) | |
print(response.json()) | |
publisher = pubsub_v1.PublisherClient(credentials=sc) | |
for topic in publisher.list_topics(request={"project": project_path}): | |
print(topic.name) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment