Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Madrox/2828d30db0d8315a9ea128e5f6ad924f to your computer and use it in GitHub Desktop.
Save Madrox/2828d30db0d8315a9ea128e5f6ad924f to your computer and use it in GitHub Desktop.
A cloud task
"""
# cloud_task_delete_old_appengine_versions.py
This is a python file designed to operate as a Cloud Function.
On trigger, it will delete all but the number of non-active
versions equal to `RETAIN_LAST_N_VERSIONS` below (default 5).
Note: failure to delete will not raise an exception. If you're
running into bugs, see the commented line below to help debug
if the delete call is throwing errors
- entrypoint: cloud_build_complete
- recommended trigger: Cloud Pub/Sub cloud builds topic
- runtime: Python 3.7
## requirements.txt
requests>=2.23.0
"""
import base64
import requests
import json
RETAIN_LAST_N_VERSIONS = 5
def list_versions(access_token):
url = 'https://appengine.googleapis.com/v1/apps/trophy-gg/services/default/versions'
headers = {
'Authorization': 'Bearer {}'.format(access_token)
}
r = requests.get(url, headers=headers)
r.raise_for_status()
return r.json()['versions']
def version_delete(name, access_token):
url = 'https://appengine.googleapis.com/v1/{}'
headers = {
'Authorization': 'Bearer {}'.format(access_token)
}
r = requests.delete(url.format(name), headers=headers)
# Uncomment below if debugging
# r.raise_for_status()
return r.json()
def cloud_build_complete(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
data = {}
if isinstance(pubsub_message, str):
try:
data = json.loads(pubsub_message)
except json.JSONDecodeError:
return
if data.get('status') != 'SUCCESS' or data.get('buildTriggerId') != '9683f557-71da-4daf-a00a-85924bb34afb':
return
print('Deleting old backend versions...')
scope = 'https://www.googleapis.com/auth/appengine.admin,https://www.googleapis.com/auth/cloud-platform'
r = requests.get("http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token",
params={
'scopes': scope
},
headers={
'Metadata-Flavor': 'Google'
})
r.raise_for_status()
access_token = r.json()['access_token']
versions = list_versions(access_token)
if len(versions) <= RETAIN_LAST_N_VERSIONS+1:
return
for version in versions[RETAIN_LAST_N_VERSIONS+1:]:
version_delete(version['name'], access_token)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment