|
#!/usr/bin/env python3 |
|
|
|
# Data source |
|
class Database: |
|
from typing import TypeVar |
|
|
|
class DBException(Exception): |
|
pass |
|
|
|
__mem = {} |
|
__T = TypeVar('T') |
|
|
|
@classmethod |
|
def init(cls, db_filepath: str): |
|
import json |
|
cls.__mem = json.loads(open(db_filepath).read()) |
|
|
|
@classmethod |
|
def find_raw(cls, model: __T, key: str, value: str) -> __T: |
|
table_name = model.__name__ |
|
if cls.__mem.get(table_name) is None: |
|
raise cls.DBException(f"{table_name} table does not exist.") |
|
for row in cls.__mem.get(table_name): |
|
if row.get(key) == value: |
|
return row |
|
raise cls.DBException( |
|
f"Value `{value}` not found for key `{key}` in table `{table_name}`") |
|
|
|
@classmethod |
|
def create(cls, model: __T, row: __T): |
|
pass |
|
|
|
@classmethod |
|
def read(cls, model: __T, key: str, value: str) -> __T: |
|
return model(**cls.find_raw(model, key, value)) |
|
|
|
@classmethod |
|
def update(cls, model: __T, row: __T): |
|
pass |
|
|
|
@classmethod |
|
def delete(cls, model: __T, row: __T): |
|
pass |
|
|
|
|
|
# Request data container |
|
class Context: |
|
from typing import Any, Dict |
|
def __init__(self, user: 'User', db_client: Any): |
|
self.user = user |
|
self.db_client = db_client |
|
|
|
|
|
# Super class for any arbitrary resource |
|
class Resource: |
|
import enum |
|
class Action(enum.Enum): |
|
CREATE = "CREATE" |
|
READ = "READ" |
|
UPDATE = "UPDATE" |
|
DELETE = "DELETE" |
|
|
|
class AccessDenied(Exception): |
|
pass |
|
|
|
class NotFound(Exception): |
|
pass |
|
|
|
class CorruptData(Exception): |
|
pass |
|
|
|
@classmethod |
|
def has_permission(cls, ctx: 'Context', action: 'Action', resource: 'Resource') -> bool: |
|
for role in ctx.user.roles: |
|
if not role.permissions.get(resource): |
|
continue |
|
if action in role.permissions.get(resource): |
|
return True |
|
return False |
|
|
|
def create(self, ctx: 'Context'): |
|
if not Resource.has_permission(ctx, Resource.Action.CREATE, self.__class__): |
|
raise Resource.AccessDenied |
|
return ctx.db_client.create(self.__class__, self) |
|
|
|
def read(self, ctx: 'Context'): |
|
if not Resource.has_permission(ctx, Resource.Action.READ, self.__class__): |
|
raise Resource.AccessDenied |
|
return ctx.db_client.read(self.__class__, self) |
|
|
|
def update(self, ctx: 'Context'): |
|
if not Resource.has_permission(ctx, Resource.Action.UPDATE, self.__class__): |
|
raise Resource.AccessDenied |
|
return ctx.db_client.update(self.__class__, self) |
|
|
|
def delete(self, ctx: 'Context'): |
|
if not Resource.has_permission(ctx, Resource.Action.DELETE, self.__class__): |
|
raise Resource.AccessDenied |
|
return ctx.db_client.delete(self.__class__, self) |
|
|
|
|
|
# Role model - every role maps to a set of resources and their allowed action(s) |
|
class Role(Resource): |
|
from typing import Dict, List |
|
def __init__(self, name: str, permissions: Dict['Resource', List['Action']]): |
|
self.name = name |
|
self.permissions = permissions |
|
|
|
|
|
# User model - request owner, has access to resources based on roles |
|
class User(Resource): |
|
def __init__(self, id: int, name: str, roles: ['Role']): |
|
self.name = name |
|
self.roles = roles |
|
|
|
|
|
# A sample resource |
|
class Car(Resource): |
|
pass |
|
|
|
|
|
# Utils for JSON to model conversion |
|
def construct_role_from_json(obj) -> Role: |
|
from typing import Dict, List |
|
|
|
if "name" not in obj: |
|
raise Resource.CorruptData("name not found in role") |
|
if "permissions" not in obj: |
|
raise Resource.CorruptData("permissions not found in role") |
|
|
|
def __get_mapped_permissions(permissions: Dict[str, List[str]]) -> Dict['Resource', List['Action']]: |
|
mapped_permissions = {} |
|
for resource, actions in permissions.items(): |
|
if resource not in globals(): |
|
raise Resource.NotFound |
|
mapped_permissions[globals()[resource]] = __get_mapped_actions(actions) |
|
return mapped_permissions |
|
|
|
def __get_mapped_actions(actions: [str]) -> ['Action']: |
|
mapped_actions = [] |
|
for action in actions: |
|
if not hasattr(Resource.Action, action): |
|
raise Resource.NotFound(f"action `{action}` not defined") |
|
mapped_actions.append(Resource.Action[action]) |
|
return mapped_actions |
|
|
|
return Role(obj["name"], __get_mapped_permissions(obj["permissions"])) |
|
|
|
|
|
def construct_user_from_json(obj) -> User: |
|
if "id" not in obj: |
|
raise Resource.CorruptData("id not found in user") |
|
if "name" not in obj: |
|
raise Resource.CorruptData("name not found in user") |
|
if "roles" not in obj: |
|
raise Resource.CorruptData("roles not found in user") |
|
|
|
def __get_mapped_roles(roles: [str]) -> ['Role']: |
|
return [construct_role_from_json(Database.find_raw(Role, "name", role)) for role in roles] |
|
|
|
return User(obj['id'], obj['name'], __get_mapped_roles(obj['roles'])) |
|
|
|
|
|
def construct_request_context(user_id) -> Context: |
|
return Context( |
|
construct_user_from_json(Database.find_raw(User, "id", user_id)), |
|
Database |
|
) |
|
|
|
|
|
# Request initiator |
|
def main(context, action, resource): |
|
if resource not in globals(): |
|
raise Resource.NotFound(f"Resource `{resource}` not found") |
|
if not hasattr(Resource.Action, action): |
|
raise Resource.NotFound(f"action `{action}` not defined") |
|
|
|
if Resource.has_permission(context, Resource.Action[action], globals()[resource]): |
|
print(f"User `{context.user.name}` has access for this operation") |
|
else: |
|
print(f"User `{context.user.name}` does not has access for this operation") |
|
|
|
|
|
# Program initiator |
|
if __name__ == "__main__": |
|
import sys |
|
if "--help" in sys.argv: |
|
print("Use this format to run the script") |
|
print("script.py <user_id> <action> <resource>") |
|
print("The data for this script uses data.json file in the same directory.") |
|
sys.exit(0) |
|
|
|
if len(sys.argv) < 4: |
|
print("Script requires exactly 3 arguments") |
|
print("script.py <user_id> <action> <resource>") |
|
sys.exit(2) |
|
|
|
in_user_id = sys.argv[1] |
|
in_action = sys.argv[2] |
|
in_resource = sys.argv[3] |
|
|
|
Database.init("data.json") |
|
main( |
|
construct_request_context(in_user_id), |
|
action=in_action, |
|
resource=in_resource |
|
) |