Skip to content

Instantly share code, notes, and snippets.

@andrewhowdencom
Created May 26, 2017 16:10
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save andrewhowdencom/ffda4c049e5a0ced94d1f01152939e6c to your computer and use it in GitHub Desktop.
from kubernetes import client, config
import sched
import time
import logging
import threading
# Debugging
from pprint import pprint
class Expiry():
# Set up the Kubernetes API for other methods
def __init__(self):
# Configs can be set in Configuration class directly or using helper utility
config.load_kube_config()
self.api = client.CoreV1Api()
self.delete_options = client.V1DeleteOptions()
# Set up logging
# Todo: Make the warnings stderr, and the info stdout
logging.basicConfig(level=logging.INFO)
def delete(self, namespace):
logging.info("Deleting namespace: %s", namespace.metadata.name)
self.api.delete_namespace(namespace.metadata.name, self.delete_options)
# Check the annotations of the namespace, and determine whether it has 'expired', or whether the life of the namespace
# is up.
#
# Stored in expiry.kubernetes.sitewards.com/timestamp as a unix timestamp
def is_namespace_expired(self, namespace):
# Todo: Fill this out
return True
def should_delete(self, namespace):
DELETEABLE_ENVIRONMENT = "disposable"
# If there are no labels, do not worry about this environment
if not isinstance(namespace.metadata.labels, dict):
return False
# Check if there is an environment indicator, and if so, whether it's disposable
try:
if namespace.metadata.labels['environment'] == DELETEABLE_ENVIRONMENT:
return self.is_namespace_expired(namespace)
except (AttributeError, KeyError) as e:
return False
def check(self):
# We don't want to watch the namespaces, but rather check them every few minutes to determine whether a namespace
# (and all it contains) should be deleted.
namespaces = self.api.list_namespace(watch=False)
for item in namespaces.items:
if self.should_delete(item):
self.delete(item)
## This should go in an invoker file I think.
expiry = Expiry()
def repeat():
# Execute the watcher
threading.Timer(30.0, repeat).start()
expiry.check()
repeat()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment