Skip to content

Instantly share code, notes, and snippets.

@booya
Created October 1, 2020 14:04
Show Gist options
  • Save booya/e10554cb6302a0a4b43dcdd16a9e4147 to your computer and use it in GitHub Desktop.
Save booya/e10554cb6302a0a4b43dcdd16a9e4147 to your computer and use it in GitHub Desktop.
Example usage of aiogoogle using a service account on a GCE instance
import asyncio
from datetime import datetime
import json
from aiogoogle import Aiogoogle
from aiohttp import ClientSession
def current_timestamp():
return int(datetime.now().strftime('%s'))
async def get_backend_services(project):
backends_list = await google.as_user(
compute.backendServices.aggregatedList(project=project, validate=False)
)
backend_services = []
for _, item in backends_list.get('items', {}).items():
[ backend_services.append(svc) for svc in item.get('backendServices', []) ]
return backend_services
async def get_creds():
'''Fetch auth_token from the Google metadata server'''
http = ClientSession()
url = 'http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token'
headers = {'Metadata-Flavor': 'Google'}
async with http.get(url, headers=headers) as client:
resp = await client.read()
await http.close()
return json.loads(resp)
async def main():
global google
global compute
while True:
auth = await get_creds()
expires_at = current_timestamp() + auth['expires_in'] - 1200
google = Aiogoogle(user_creds=auth)
if not compute:
print('Discovering compute API')
compute = await google.discover('compute', 'v1')
while current_timestamp() < expires_at:
expires_in = expires_at - current_timestamp()
print(f'Access token refreshes in {expires_in} seconds.')
backends = await get_backend_services('my-project')
print(f'Got {len(backends)} backend services.')
await asyncio.sleep(20)
print('Closing google session')
await google.active_session.close()
if __name__ == '__main__':
google = None
compute = None
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment