Skip to content

Instantly share code, notes, and snippets.

@vskrachkov
Created April 5, 2021 13:22
Show Gist options
  • Save vskrachkov/2d0fdf7cd62d170af777a570026a1ab1 to your computer and use it in GitHub Desktop.
Save vskrachkov/2d0fdf7cd62d170af777a570026a1ab1 to your computer and use it in GitHub Desktop.
from typing import Iterable, Tuple, Type, Optional, List
from django.conf import settings
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.db.models import Model
from keycloak.admin.clientroles import ClientRoles
from keycloak.exceptions import KeycloakClientError
from keycloak.openid_connect import KeycloakOpenidConnect
from keycloak.realm import KeycloakRealm
from keycloak.uma import KeycloakUMA
def do_sync() -> None:
realm = _get_realm()
oidc_client = _get_oidc_client(realm)
# get access token
resp = oidc_client.client_credentials()
access_token = resp["access_token"]
print(resp)
uma_client = _get_uma_client(realm)
resources = uma_client.resource_set_list(access_token)
for resource_id in resources:
print(uma_client.resource_set_read(access_token, resource_id))
# _sync_resources(access_token, uma_client)
_sync_roles(realm)
def _sync_resources(access_token: str, uma_client: KeycloakUMA) -> None:
for ct, permissions in _get_content_types_with_permissions():
resource_name = ct.name
try:
uma_client.resource_set_create(
access_token,
resource_name,
scopes=permissions,
_id=resource_name,
ownerManagedAccess=True,
)
except KeycloakClientError as exc:
if exc.original_exc.response.status_code == 409:
uma_client.resource_set_update(
access_token,
id=resource_name,
name=resource_name,
scopes=permissions,
)
def _sync_roles(realm: KeycloakRealm) -> None:
roles = _get_client_roles(realm)
try:
for perm in _get_all_permissions():
roles.create(perm.codename, description=perm.name)
except KeycloakClientError as exc:
print(exc.original_exc)
def _get_client_roles(realm: KeycloakRealm) -> ClientRoles:
return (
realm.admin.realms.by_name(realm.realm_name)
.clients.by_id(settings.KEYCLOAK_CLIENT_ID)
.roles
)
def _get_uma_client(realm: KeycloakRealm) -> KeycloakUMA:
return realm.uma()
def _get_oidc_client(realm: KeycloakRealm) -> KeycloakOpenidConnect:
return realm.open_id_connect(
client_id=settings.KEYCLOAK_CLIENT_ID,
client_secret=settings.KEYCLOAK_CLIENT_SECRET,
)
def _get_realm() -> KeycloakRealm:
return KeycloakRealm(
server_url=settings.KEYCLOAK_URL,
realm_name=settings.KEYCLOAK_REALM,
)
def _get_all_permissions() -> Iterable[Permission]:
return Permission.objects.all()
def _get_content_types_with_permissions() -> Iterable[Tuple[ContentType, List[str]]]:
for ct in ContentType.objects.all():
model_cls: Optional[Type[Model]] = ct.model_class()
if model_cls:
model_permissions = model_cls._meta.permissions + list(
model_cls._meta.default_permissions
)
yield (ct, model_permissions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment