Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env python3
import base64
import json
from kubernetes import config, client, dynamic
from cryptography import x509
from cryptography.hazmat.backends import default_backend
config.load_kube_config()
dc = dynamic.DynamicClient(client=client.ApiClient())
certificate_requests_resource = dc.resources.get(kind='CertificateRequest', api_version='v1alpha2')
v1 = client.CoreV1Api()
nses = v1.list_namespace(label_selector='olympus.eng.vmware.com/stack')
broken_namespaces = {}
for ns in nses.items:
if ns.metadata.name == "tmc-stable":
continue
secrets=[]
update_secrets=[]
all_secrets = v1.list_namespaced_secret(ns.metadata.name)
for s in all_secrets.items:
if s.metadata.annotations is not None and \
s.metadata.annotations.\
get("cert-manager.io/issuer-name", None) == "dev":
secrets.append(s)
for secret in secrets:
secret_has_cert_manager = False
cert = x509.load_pem_x509_certificate(base64.b64decode(secret.data["tls.crt"]), default_backend())
for a in cert.issuer:
if a.rfc4514_string() == "O=cert-manager":
secret_has_cert_manager = True
break
if secret_has_cert_manager:
update_secrets.append(secret)
if len(update_secrets) > 0:
broken_namespaces[ns.metadata.name] = update_secrets
for broken_namespace in broken_namespaces:
print(f"├ {broken_namespace}")
crs = dc.get(certificate_requests_resource, namespace=broken_namespace)
for cr in crs.items:
print(f"├─ deleting certificate request \"{cr['metadata']['name']}\"")
dc.delete(certificate_requests_resource, namespace=broken_namespace, name=cr["metadata"]["name"])
print(f"├─ patching secret \"dev-ca-key\"")
v1.patch_namespaced_secret('dev-ca-key', broken_namespace, body=json.loads('[{"op": "replace", "path": "/data/ca.crt", "value": "" }, {"op": "replace", "path": "/data/tls.crt", "value": "" }]'))
for secret in broken_namespaces[broken_namespace]:
print(f"├─ patching secret \"{secret.metadata.name}\"")
v1.patch_namespaced_secret(secret.metadata.name, broken_namespace, body=json.loads('[{"op": "replace", "path": "/data/ca.crt", "value": "" }, {"op": "replace", "path": "/data/tls.crt", "value": "" }]'))
#!/usr/bin/env python3
import base64
import json
import datetime
from kubernetes import config, client, dynamic
from cryptography import x509
from cryptography.hazmat.backends import default_backend
config.load_kube_config()
dc = dynamic.DynamicClient(client=client.ApiClient())
certificate_requests_resource = dc.resources.get(kind='CertificateRequest', api_version='v1alpha2')
v1 = client.CoreV1Api()
nses = v1.list_namespace(label_selector='olympus.eng.vmware.com/stack')
broken_namespaces = {}
certs = []
ca_certs = []
for ns in nses.items:
if ns.metadata.name == "tmc-stable":
continue
secrets=[]
update_secrets=[]
all_secrets = v1.list_namespaced_secret(ns.metadata.name)
for s in all_secrets.items:
if s.metadata.annotations is not None and \
s.metadata.annotations.\
get("cert-manager.io/issuer-name", None) == "dev":
secrets.append(s)
for secret in secrets:
cert = x509.load_pem_x509_certificate(base64.b64decode(secret.data["ca.crt"]), default_backend())
if cert.not_valid_after < datetime.datetime(2021, 3, 1, 0, 0, 0):
c = {
'name': secret.metadata.name,
'expiry': cert.not_valid_after,
'namespace': secret.metadata.namespace,
}
ca_certs.append(c)
cert = x509.load_pem_x509_certificate(base64.b64decode(secret.data["tls.crt"]), default_backend())
if cert.not_valid_after < datetime.datetime(2021, 3, 1, 0, 0, 0):
c = {
'name': secret.metadata.name,
'expiry': cert.not_valid_after,
'namespace': secret.metadata.namespace,
}
certs.append(c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment