Skip to content

Instantly share code, notes, and snippets.

@jarulsamy
Created February 19, 2024 20:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jarulsamy/d6fc0ed7cfdd3143428f67be6b851ef9 to your computer and use it in GitHub Desktop.
Save jarulsamy/d6fc0ed7cfdd3143428f67be6b851ef9 to your computer and use it in GitHub Desktop.
Syncing groups and users from IdM to Keycloak.
#!/usr/bin/env python3
"""
kc-sync - Sync existing Keycloak users group memberships with IdM.
This script:
1. Creates all groups from IdM in Keycloak
2. Queries Keycloak for all users within the realm.
3. Finds the corresponding user within IdM.
4. Calculates which groups a given user should be added to/removed from.
5. Either shows the changes to be made or performs the previously calculated transactions.
"""
import argparse
import logging
import sys
from pprint import pprint
import keycloak
import urllib3
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from python_freeipa import ClientMeta
import os
from pathlib import Path
import configparser
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("kc-sync")
def idm_get_user_info(ipa, username):
"""Get information about a user from IdM."""
users = ipa.user_find(o_uid=username)
if users["count"] > 1:
msg = f"Found multiple users with same username: {username}"
logger.error(msg)
raise ValueError(msg)
logger.debug("Found valid user %s within IdM", username)
return users["result"][0]
def keycloak_create_group(kcadm, group_name):
"""Create a group if it does not exist, otherwise ignore."""
try:
group = kcadm.create_group({"name": group_name}, skip_exists=True)
logger.info("Created group %s in Keycloak", group_name)
except keycloak.exceptions.KeycloakPostError:
logger.error("Failed to create group %s in Keycloak", group_name)
return None
return group
def keycloak_add_user_to_groups(kcadm, username, user_id, groups):
"""Add a user to groups, ignore if already within group."""
for g in groups:
# HACK: Assuming all groups are parent groups, this should work for our purpose,
# but will likely need to change if we need complicated group hierarchies.
g_path = f"/{g}"
kc_group = kcadm.get_group_by_path(g_path)
group_id = kc_group["id"]
kcadm.group_user_add(user_id, group_id)
logger.info("Added user %s to group %s", username, kc_group["name"])
def keycloak_remove_user_from_groups(kcadm, username, user_id, groups):
"""Remove a user from groups, ignore if already not within group."""
for g in groups:
# HACK: Assuming all groups are parent groups, this should work for our purpose,
# but will likely need to change if we need complicated group hierarchies.
g_path = f"/{g}"
kc_group = kcadm.get_group_by_path(g_path)
group_id = kc_group["id"]
kcadm.group_user_remove(user_id, group_id)
logger.info("Removed user %s from group %s", username, kc_group["name"])
def sync_group_from_idm_to_keycloak(kcadm, ipa, dry_run=False):
line = "=" * 80
logger.debug("Retrieving IdM groups")
idm_groups = []
queries = (
ipa.group_find(
o_timelimit=0,
o_sizelimit=0,
o_posix=True,
),
ipa.group_find(
o_timelimit=0,
o_sizelimit=0,
o_posix=False,
),
ipa.group_find(
o_timelimit=0,
o_sizelimit=0,
o_external=True,
),
)
for q in queries:
idm_groups += q["result"]
# idm_groups = ipa.group_find(
# o_timelimit=0,
# o_sizelimit=0,
# o_posix=True,
# )
# idm_groups = idm_groups["result"]
# Create any groups that are in IdM but not in keycloak within keycloak.
# This makes manual creation of RBAC rules a lot easier.
for group in idm_groups:
if len(group["cn"]) > 1:
logger.warning("Found IdM group with multiple CNs!")
group_name = group["cn"][0]
if not dry_run:
keycloak_create_group(kcadm, group_name)
keycloak_users = kcadm.get_users({})
for user in keycloak_users:
if user["username"] == "admin":
# Skip admin
continue
idm_user = idm_get_user_info(ipa, user["username"])
idm_user_groups = idm_user["memberof_group"]
keycloak_user_id = user["id"]
keycloak_user_groups = kcadm.get_user_groups(user_id=keycloak_user_id)
keycloak_user_groups = {v["name"]: v for v in keycloak_user_groups}
# Set theory is pretty rad.
# Compute the set difference in both directions.
groups_to_remove_membership = set(keycloak_user_groups.keys()) - set(
idm_user_groups
)
groups_to_add_membership = set(idm_user_groups) - set(
keycloak_user_groups.keys()
)
# Side note, if you are wondering why I'm not just using
# kcadm.update_user() to do this, There is a bug in the upstream keycloak
# API that doesn't respect the groups field within the payload. Thus we
# have to compute these differences on our end and have a seperate API call
# for each group membership change.
if not dry_run:
logger.info("Commiting transactions")
keycloak_remove_user_from_groups(
kcadm,
user["username"],
keycloak_user_id,
groups_to_remove_membership,
)
keycloak_add_user_to_groups(
kcadm,
user["username"],
keycloak_user_id,
groups_to_add_membership,
)
continue
# Dry run, just print the info
logger.info("DRY-RUN, just printing details")
print(line)
print(f"Keycloak User: {user['username']}")
print(f"IdM User: {idm_user['uid'][0]}")
print("Keycloak group memberships to remove user from:")
for i in groups_to_remove_membership:
print(f" {i}")
print("Keycloak group memberships to add user to:")
for i in groups_to_add_membership:
print(f" {i}")
print()
def build_parser():
"""Build the CLI parser."""
parser = argparse.ArgumentParser(
description="IdM to Keycloak Group Sync Tool",
)
parser.add_argument(
"--version",
action="version",
version="%(prog)s 1.0",
)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="increase logging verbosity, can be used 3 times",
)
parser.add_argument(
"-n",
"--dry-run",
action="store_true",
help="print which actions would occur, but don't actually modify Keycloak",
)
return parser
def load_config(path=Path("./kc-sync.conf")):
"""Load config from a file or env vars.
Load program config either from a file or environment variables.
Automatically prioritizes options in this order:
(highest priority)
CLI FLAG
ENV VAR
CONFIG FILE
(lowest priority)
"""
config = configparser.ConfigParser()
if not path.is_file():
config["KEYCLOAK"] = {
"server_url": "https://arcccloak.example.com",
"username": "username",
"password": "password",
"realm_name": "test-realm",
}
config["IPA"] = {
"server_url": "idm.server.example.com",
"username": "username",
"password": "password",
}
with open(path, "w") as f:
config.write(f)
print(f"Created default config file at {path.absolute()}")
print(f"Please modify it and rerun")
sys.exit(0)
config.read(path)
result = {
"KEYCLOAK": {
"server_url": os.getenv(
"KCS_KEYCLOAK_SERVER_URL",
config["KEYCLOAK"]["server_url"],
),
"username": os.getenv(
"KCS_KEYCLOAK_USERNAME",
config["KEYCLOAK"]["username"],
),
"password": os.getenv(
"KCS_KEYCLOAK_PASSWORD",
config["KEYCLOAK"]["password"],
),
"realm_name": os.getenv(
"KCS_KEYCLOAK_REALM_NAME",
config["KEYCLOAK"]["realm_name"],
),
},
"IPA": {
"server_url": os.getenv(
"KCS_IPA_SERVER_URL",
config["IPA"]["server_url"],
),
"username": os.getenv(
"KCS_IPA_USERNAME",
config["IPA"]["username"],
),
"password": os.getenv(
"KCS_IPA_PASSWORD",
config["IPA"]["password"],
),
},
}
return result
def main():
parser = build_parser()
args = parser.parse_args()
config = load_config()
# Setup logger
if args.verbose == 1:
logger.setLevel(logging.WARNING)
elif args.verbose == 2:
logger.setLevel(logging.INFO)
elif args.verbose >= 3:
logger.setLevel(logging.DEBUG)
else:
# Disable insecure cert warning.
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger.setLevel(logging.ERROR)
# Log into keycloak
keycloak_connection = KeycloakOpenIDConnection(
server_url=config["KEYCLOAK"]["server_url"],
username=config["KEYCLOAK"]["username"],
password=config["KEYCLOAK"]["password"],
user_realm_name="master",
realm_name=config["KEYCLOAK"]["realm_name"],
verify=True,
)
kcadm = KeycloakAdmin(connection=keycloak_connection)
# Log into IPA
ipa = ClientMeta(
config["IPA"]["server_url"],
verify_ssl=False,
)
ipa.login(config["IPA"]["username"], config["IPA"]["password"])
sync_group_from_idm_to_keycloak(kcadm, ipa, args.dry_run)
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment