Skip to content

Instantly share code, notes, and snippets.

@franznemeth
Created September 5, 2022 12:36
Show Gist options
  • Save franznemeth/db3fb9aa317f1cdaa6601f6204a3a2ec to your computer and use it in GitHub Desktop.
Save franznemeth/db3fb9aa317f1cdaa6601f6204a3a2ec to your computer and use it in GitHub Desktop.
from kubernetes import client, config
import logging
import sys
import re
logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
if __name__ == '__main__':
config.load_kube_config()
api = client.CustomObjectsApi()
userattributes = api.list_cluster_custom_object(group="management.cattle.io",
version="v3",
plural="userattributes",
watch=False)
# this is stupid: https://github.com/kubernetes-client/python/issues/818#issuecomment-716560405
# explanation as to why the iterator looks insane
for userattribute in list(userattributes.items())[1][1]:
if not userattribute["ExtraByProvider"]:
user = api.get_cluster_custom_object(group="management.cattle.io",
version="v3",
plural="users",
name=userattribute["metadata"]["name"])
principalid = user["principalIds"][0]
try:
username = re.search('CN=([^,]*)', principalid).group(1).lower().replace(" ", ".")
except AttributeError:
logger.warning(f"Could not find principalid for user with displayName: {user.get('displayName')} and username: {user.get('username')}")
continue
logger.info(f"Fixing .extraByProvider for user {userattribute['metadata']['name']} with name: {username}")
extra_by_provider_patch = {
"ExtraByProvider": {
"activedirectory": {
"principalid": [principalid],
"username": [username]
}
}
}
patched_userattribute = api.patch_cluster_custom_object(group="management.cattle.io",
version="v3",
name=userattribute["metadata"]["name"],
plural="userattributes",
body=extra_by_provider_patch)
logger.info(f"Patched {userattribute['metadata']['name']}")
else:
logger.debug(f"No action needed for user {userattribute['metadata']['name']}")
@eumel8
Copy link

eumel8 commented Sep 29, 2022

oneliner for keycloak:

for i in `kubectl get userattributes.management.cattle.io -ocustom-columns='NAME:.metadata.name,GENERATION:.metadata.generation,LASTREFRESH:.LastRefresh,EXTRABYPROVIDER:.ExtraByProvider' | grep -v map| awk '{print $1}'`; do user=$(kubectl get user $i -o json | jq -r .displayName); kubectl patch userattributes $i --type='json' -p='{"ExtraByProvider":{"keycloak":{"principalid":["keycloak_user://'$user'"],"username":["'$user'"]}}}' --type=merge; kubectl get userattributes $i -o json | jq -r .ExtraByProvider;done

@gregsidelinger
Copy link

Here my quick version of this script for Azure AD. Instead of using the ldap CN for the username it looks up the users userPrincipalName using the az cli. The az cli was the quick way instead of doing it 100% in python as I do not have a working az python setup but I did have the cli. I also built a list of Rancher users which no longer exist in Azure AD and removed them outside of this script with a kubectl delete user USER once verified it was ok to remove them.

from kubernetes import client, config
import logging
import sys
import re
import subprocess

logger = logging.getLogger()
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

usersNotFound = []

if __name__ == '__main__':
    config.load_kube_config()

    api = client.CustomObjectsApi()

    userattributes = api.list_cluster_custom_object(group="management.cattle.io",
                                                    version="v3",
                                                    plural="userattributes",
                                                    watch=False)
    # this is stupid: https://github.com/kubernetes-client/python/issues/818#issuecomment-716560405
    # explanation as to why the iterator looks insane
    for userattribute in list(userattributes.items())[1][1]:
        if not userattribute["ExtraByProvider"]:
            user = api.get_cluster_custom_object(group="management.cattle.io",
                                                 version="v3",
                                                 plural="users",
                                                 name=userattribute["metadata"]["name"])
            principalid = user["principalIds"][0]
            azId = principalid.split('/')[-1]
            result = subprocess.run(['az', 'ad', 'user', 'show', '--id', azId, '--query', 'userPrincipalName'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            if userattribute.get("username", "") in ["admin"]:
                continue
            if not result:
                usersNotFound.append(userattribute['metadata']['name'])
                logger.warning(f"Could not lookup azure id for principalid for user with displayName: {user.get('displayName')} and azid : {azId}")
                continue
            try:
                output = result.stdout.decode('utf-8')
                username = output.split('"')[1]
            except:
                usersNotFound.append(userattribute['metadata']['name'])
                logger.warning(f"Could not find principalid for user with displayName: {user.get('displayName')} and azid : {azId}")
                continue

            logger.info(f"Fixing .extraByProvider for user {userattribute['metadata']['name']} with name: {username}")
            extra_by_provider_patch = {
                "ExtraByProvider": {
                    "azuread": {
                        "principalid": [principalid],
                        "username": [username]
                    }
                }
            }
            patched_userattribute = api.patch_cluster_custom_object(group="management.cattle.io",
                                                                    version="v3",
                                                                    name=userattribute["metadata"]["name"],
                                                                    plural="userattributes",
                                                                    body=extra_by_provider_patch)
            logger.info(f"Patched {userattribute['metadata']['name']}")
        else:
            logger.debug(f"No action needed for user {userattribute['metadata']['name']}")

# print out users which could not be found
print("Users which no longer exist in Azure AD")
for user in usersNotFound:
    print(user)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment