Skip to content

Instantly share code, notes, and snippets.

@jewzaam
Created March 17, 2020 21:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jewzaam/418bd93cfdcda30ed4358e5d7157e973 to your computer and use it in GitHub Desktop.
Save jewzaam/418bd93cfdcda30ed4358e5d7157e973 to your computer and use it in GitHub Desktop.
Script to grant RBAC to dedicated-admins for CRDs
import re
from kubernetes import client, config
from kubernetes.client import ApiClient
# based on initial 4.3.0 installation using 3 tiers of crd name possible:
# oc get crds -o json | jq -r '.items[].metadata.name' | sed 's/.*\(\.[^.]*\.[^.]*\.[^.]*\)$/\1/g' | sed 's/^[^.]*\(\.[^.]*\.[^.]*\)/\1/g' | sort -u
RE_DENYLIST = [
r".*\.authorization\.openshift\.io",
r".*\.autoscaling\.openshift\.io",
r".*\.cloudcredential\.openshift\.io",
r".*\.cni\.cncf\.io",
r".*\.config\.openshift\.io",
r".*\.console\.openshift\.io",
r".*\.machineconfiguration\.openshift\.io",
r".*\.machine\.openshift\.io",
r".*\.managed\.openshift\.io",
r".*\.monitoring\.coreos\.com",
r".*\.network\.openshift\.io",
r".*\.operator\.openshift\.io",
r".*\.operators\.coreos\.com",
r".*\.quota\.openshift\.io",
r".*\.security\.openshift\.io",
r".*\.tuned\.openshift\.io",
# metal3.io comes with ocp, we don't use it on osd, deny to customer
r".*\.metal3\.io",
# may need to open this up if customers can use velero for migrations etc
r".*\.velero\.io",
]
CLUSTERROLE_LABEL_KEY = "owner"
CLUSTERROLE_LABEL_VALUE = "nmalik"
# TODO update to do incluster
config.load_kube_config()
# https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/RbacAuthorizationV1Api.md
api_authz = client.RbacAuthorizationV1Api()
# https://github.com/kubernetes-client/python/blob/master/examples/custom_object.py
api_custom = client.CustomObjectsApi()
crds = api_custom.list_cluster_custom_object(
group="apiextensions.k8s.io",
version="v1",
plural="customresourcedefinitions",
)
# TODO figure out iterating over api_authz.list_cluster_role results
# collect all clusterroles that might need to be cleaned up by this
#existing_clusterroles = api_authz.list_cluster_role(
# label_selector="{}={}".format(CLUSTERROLE_LABEL_KEY, CLUSTERROLE_LABEL_VALUE),
#)
# TODO remove when iterating over the cluster_role list works
existing_clusterroles = api_custom.list_cluster_custom_object(
group="authorization.openshift.io",
version="v1",
plural="clusterroles",
)
existing_clusterrole_names = []
for cr in existing_clusterroles['items']:
if 'labels' in cr['metadata'] and CLUSTERROLE_LABEL_KEY in cr['metadata']['labels'] and cr['metadata']['labels'][CLUSTERROLE_LABEL_KEY] == CLUSTERROLE_LABEL_VALUE:
existing_clusterrole_names.append(cr['metadata']['name'])
print(existing_clusterrole_names)
for crd in crds['items']:
crd_name = crd['metadata']['name']
crd_scope = crd['spec']['scope'].lower()
crd_group = crd['spec']['group']
# name includes group, split group out
crd_name_short = crd_name.split(".")[0]
# rename scope from "namespaced" to "project" to align w/ rbac configuration for dedicated-admin
if crd_scope == "namespaced":
crd_scope = "project"
deny = False
for pattern in RE_DENYLIST:
if re.match(pattern, crd_name):
#print("DENY: {}".format(crd_name))
deny = True
continue
if not deny:
#print("ALLOW: {}".format(crd_name))
clusterrole_name = "dedicated-admins-{}".format(crd_name)
# does it already exist?
try:
# get group is not the same as apiVersion for create..
api_custom.get_cluster_custom_object(
group="authorization.openshift.io",
version="v1",
plural="clusterroles",
name=clusterrole_name,
)
print("SKIPPED: {}".format(clusterrole_name))
except:
clusterrole = {
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "ClusterRole",
"metadata": {
"name": clusterrole_name,
"labels": {
CLUSTERROLE_LABEL_KEY: CLUSTERROLE_LABEL_VALUE,
},
},
"rules": [
{
"apiGroups": [crd_group],
"resources": [crd_name_short],
"verbs": ["*"],
}
],
"aggregationRule": {
"clusterRoleSelectors": [
{
"dedicated-admins/aggregate-to-{}".format(crd_scope): "true"
}
]
}
}
api_authz.create_cluster_role(
body=clusterrole,
)
print("CREATED: {}".format(clusterrole_name))
if clusterrole_name in existing_clusterrole_names:
existing_clusterrole_names.remove(clusterrole_name)
# if we have any existing_clusterrole_names left, delete them
for clusterrole_name in existing_clusterrole_names:
api_authz.delete_cluster_role(
name=clusterrole_name,
)
print("DELETED: {}".format(clusterrole_name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment