Skip to content

Instantly share code, notes, and snippets.

@Natim
Last active January 10, 2022 03:53
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Natim/77c8f61c1d42e476cef8 to your computer and use it in GitHub Desktop.
Save Natim/77c8f61c1d42e476cef8 to your computer and use it in GitHub Desktop.
How to handle our permission schema with Redis
# -*- coding: utf-8 -*-
from __future__ import print_function
import json
import redis
r = redis.StrictRedis(host='localhost', port=6379, db=1)
# List of permissions that gives us this permission
PERMISSIONS_INHERITANCE = {
'bucket:write': {
'bucket': ['write']
},
'bucket:read': {
'bucket': ['write', 'read']
},
'groups:create': {
'bucket': ['write', 'groups:create']
},
'collections:create': {
'bucket': ['write', 'collections:create']
},
'group:write': {
'bucket': ['write'],
'group': ['write']
},
'group:read': {
'bucket': ['write', 'read'],
'group': ['write', 'read']
},
'collection:write': {
'bucket': ['write'],
'collection': ['write'],
},
'collection:read': {
'bucket': ['write', 'read'],
'collection': ['write', 'read'],
},
'records:create': {
'bucket': ['write'],
'collection': ['write', 'records:create']
},
'record:write': {
'bucket': ['write'],
'collection': ['write'],
'record': ['write']
},
'record:read': {
'bucket': ['write', 'read'],
'collection': ['write', 'read'],
'record': ['write', 'read']
}
}
def add_to_group(principal, group):
"""Add the principal to the given group."""
r.sadd('principal:%s' % group, principal)
r.sadd('principal:%s' % principal, group)
def remove_from_group(principal, group):
"""Remove the principal from the given group."""
r.srem('principal:%s' % group, principal)
r.srem('principal:%s' % principal, group)
def get_user_principals(user_id):
"""Return the list of principals for the given user."""
members = r.smembers('principal:%s' % user_id)
return members | set([user_id])
def build_perm_set_id(obj_type, perm, obj_parts):
PARTS_LENGTH = {
'bucket': 3,
'collection': 5,
'record': 7
}
return 'permission:%s:%s' % (
'/'.join(obj_parts[:PARTS_LENGTH[obj_type]]),
perm
)
def get_obj_type_from_key(obj_key):
if 'records' in obj_key:
obj_type = 'record'
elif 'collections' in obj_key:
obj_type = 'collection'
elif 'buckets' in obj_key:
obj_type = 'bucket'
else:
raise ValueError('%s key is invalid' % obj_key)
return obj_type
def get_perm_keys(permission):
obj_key, perm = permission.split(':', 1)
obj_parts = obj_key.split('/')
obj_type = get_obj_type_from_key(obj_key)
perm_type = '%s:%s' % (obj_type, perm)
keys = set([])
for perm_obj_type, permissions in \
PERMISSIONS_INHERITANCE[perm_type].items():
for other_perm in permissions:
keys.add(build_perm_set_id(perm_obj_type, other_perm, obj_parts))
return keys
def get_user_permission(user_id, permission):
"""Give the full list of principals having this permission."""
keys = get_perm_keys(permission)
multi = r.pipeline()
multi.sadd('principal:%s' % user_id, user_id)
multi.sunionstore('temp_perm:%s' % permission, *list(keys))
multi.sinter('temp_perm:%s' % permission, 'principal:%s' % user_id)
multi.delete('temp_perm:%s' % permission)
results = multi.execute()
return results[2]
def has_permission(user_id, permission):
"""Return a boolean telling if the user have got the permission."""
members = get_user_permission(user_id, permission)
return len(members) > 0
def add_permission(principal, permission):
"""Add the permission to the principal.
Print the list of redis commands to run in order to achieve the
action."""
r.sadd('permission:%s' % permission, principal)
def remove_permission(principal, permission):
"""Remove the permission to this principal on this bucket."""
r.srem('permission:%s' % permission, principal)
def get_object_permission(obj_key):
"""Display the permission added to this given object."""
results = r.keys('permission:%s:*' % obj_key)
multi = r.pipeline()
for result in results:
multi.smembers(result)
members = multi.execute()
permissions = {}
for i, key in enumerate(results):
perm = key.split(':', 2)[-1]
permissions[perm] = members[i]
return permissions
def get_object_principals(obj_key):
"""Give the full list of principals permission."""
permissions = {}
permissions['read'] = r.sunion(*list(get_perm_keys('%s:read' % obj_key)))
permissions['write'] = r.sunion(*list(get_perm_keys('%s:write' % obj_key)))
return permissions
def get_permission_principals(permission):
return r.sunion(*list(get_perm_keys(permission)))
def set_object(obj_key, data=None):
"""Simulate the add of a object or a subobject"""
# Add the object
# Build the permission set for this object and add principal for upper sets
obj_type = get_obj_type_from_key(obj_key)
obj_data = {
"id": obj_key,
"type": obj_type,
"data": data or {},
}
r.set('object:%s' % obj_key, json.dumps(obj_data))
def del_object(obj_key):
"""Simulate the removal of an object and its subobject"""
# Remove the object
multi = r.pipeline()
multi.delete('object:%s' % obj_key)
# Remove the sub-objects permissions sets
cursor, results = r.scan(0, '*:%s/*' % obj_key)
while True:
for result in results:
multi.delete(result)
if cursor == 0:
break
else:
cursor, results = r.scan(cursor, '*:%s/*' % obj_key)
# Remove permissions sets
cursor, results = r.scan(0, '*:%s:*' % obj_key)
while True:
for result in results:
multi.delete(result)
if cursor == 0:
break
else:
cursor, results = r.scan(cursor, '*:%s:*' % obj_key)
multi.execute()
if __name__ == '__main__':
user_id = "/users/natim"
user2_id = "/users/alexis"
group_id = "/buckets/blog/groups/moderators"
bucket_id = "/buckets/blog"
collection_id = "/buckets/blog/collections/articles"
collection2_id = "/buckets/blog/collections/comments"
record_id = "/buckets/blog/collections/articles/records/yo"
record2_id = "/buckets/blog/collections/comments/records/foobar"
# See what's happen with groups
add_to_group(user_id, group_id)
print(group_id, get_user_principals(user_id))
# Create some feature
set_object(bucket_id)
set_object(collection_id)
set_object(collection2_id)
set_object(record_id)
set_object(record2_id)
# Add an admin to the bucket
add_permission(user_id, '%s:write' % bucket_id)
print("add admin to the bucket", record2_id,
get_object_principals(record2_id))
# Add moderator to the articles and comments
add_permission(group_id, '%s:write' % collection_id)
add_permission(group_id, '%s:write' % collection2_id)
print("add group moderator", record2_id, get_object_principals(record2_id))
# Add alexis as author of a comment
add_permission(user2_id, '%s:write' % record2_id)
print("add alexis", record2_id, get_object_principals(record2_id))
# Remove the user from the group
remove_from_group(user_id, group_id)
print(get_user_principals(user_id))
# Can alexis edit the article?
print("Can alexis edit the article?", get_user_principals(user2_id) & get_permission_principals('%s:write' % record_id), has_permission(user2_id, '%s:write' % record_id))
print("Add alexis to the moderator group")
add_to_group(user2_id, group_id)
print("Can alexis edit the article?", get_user_principals(user2_id) & get_permission_principals('%s:write' % record_id), has_permission(user2_id, '%s:write' % record_id))
# Remove the bucket
del_object(bucket_id)
print(r.keys())
r.flushdb()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment