Created
October 1, 2020 14:04
-
-
Save booya/e10554cb6302a0a4b43dcdd16a9e4147 to your computer and use it in GitHub Desktop.
Example usage of aiogoogle using a service account on a GCE instance
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from datetime import datetime | |
import json | |
from aiogoogle import Aiogoogle | |
from aiohttp import ClientSession | |
def current_timestamp(): | |
return int(datetime.now().strftime('%s')) | |
async def get_backend_services(project): | |
backends_list = await google.as_user( | |
compute.backendServices.aggregatedList(project=project, validate=False) | |
) | |
backend_services = [] | |
for _, item in backends_list.get('items', {}).items(): | |
[ backend_services.append(svc) for svc in item.get('backendServices', []) ] | |
return backend_services | |
async def get_creds(): | |
'''Fetch auth_token from the Google metadata server''' | |
http = ClientSession() | |
url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token' | |
headers = {'Metadata-Flavor': 'Google'} | |
async with http.get(url, headers=headers) as client: | |
resp = await client.read() | |
await http.close() | |
return json.loads(resp) | |
async def main(): | |
global google | |
global compute | |
while True: | |
auth = await get_creds() | |
expires_at = current_timestamp() + auth['expires_in'] - 1200 | |
google = Aiogoogle(user_creds=auth) | |
if not compute: | |
print('Discovering compute API') | |
compute = await google.discover('compute', 'v1') | |
while current_timestamp() < expires_at: | |
expires_in = expires_at - current_timestamp() | |
print(f'Access token refreshes in {expires_in} seconds.') | |
backends = await get_backend_services('my-project') | |
print(f'Got {len(backends)} backend services.') | |
await asyncio.sleep(20) | |
print('Closing google session') | |
await google.active_session.close() | |
if __name__ == '__main__': | |
google = None | |
compute = None | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment