Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
from kubernetes import client, config
import sched
import time
import logging
import threading
# Debugging
from pprint import pprint
class Expiry():
# Set up the Kubernetes API for other methods
def __init__(self):
# Configs can be set in Configuration class directly or using helper utility
self.api = client.CoreV1Api()
self.delete_options = client.V1DeleteOptions()
# Set up logging
# Todo: Make the warnings stderr, and the info stdout
def delete(self, namespace):"Deleting namespace: %s",
self.api.delete_namespace(, self.delete_options)
# Check the annotations of the namespace, and determine whether it has 'expired', or whether the life of the namespace
# is up.
# Stored in as a unix timestamp
def is_namespace_expired(self, namespace):
# Todo: Fill this out
return True
def should_delete(self, namespace):
# If there are no labels, do not worry about this environment
if not isinstance(namespace.metadata.labels, dict):
return False
# Check if there is an environment indicator, and if so, whether it's disposable
if namespace.metadata.labels['environment'] == DELETEABLE_ENVIRONMENT:
return self.is_namespace_expired(namespace)
except (AttributeError, KeyError) as e:
return False
def check(self):
# We don't want to watch the namespaces, but rather check them every few minutes to determine whether a namespace
# (and all it contains) should be deleted.
namespaces = self.api.list_namespace(watch=False)
for item in namespaces.items:
if self.should_delete(item):
## This should go in an invoker file I think.
expiry = Expiry()
def repeat():
# Execute the watcher
threading.Timer(30.0, repeat).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.