Skip to content

Instantly share code, notes, and snippets.

@mmerickel
Last active October 10, 2018 04:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mmerickel/7a4b288fc10860cce42c5eae46b20546 to your computer and use it in GitHub Desktop.
Save mmerickel/7a4b288fc10860cce42c5eae46b20546 to your computer and use it in GitHub Desktop.
from pyramid.decorator import reify
from .principals import Principals as P
class AccessToken:
def __init__(self, type, user=None, claims=None):
self.type = type
self.user = user
self.claims = claims or []
@reify
def principal(self):
if self.user:
return P.user(user=self.user.id)
@reify
def effective_principals(self):
principals = {
P.everyone,
}
add = principals.add
if self.principal:
add(P.authenticated)
add(self.principal)
user = self.user
if user:
add(P.user())
principals.update(P.role(role) for role in user.roles)
principals.update(P.user(claim=c) for c in self.claims)
return principals
from myapp.principals import ACE
class Page:
def __acl__(self):
return [
(ACE.allow, ACE.everyone, 'view'),
(ACE.allow, ACE.user(self.owner_id), 'edit'),
]
from pyramid.security import (
ALL_PERMISSIONS,
Allow,
Authenticated,
Deny,
Everyone,
)
NoValue = object()
def make_principal(values):
p = [
(
k if v is True else
f'no_{k}' if v is False else
f'{k}:{v}'
)
for k, v in values
if v is not NoValue
]
return ' '.join(p)
class _PrincipalFactory:
everyone = Everyone
authenticated = Authenticated
def role(self, role):
return make_principal([
('role', role),
])
def user(
self,
user=True,
*,
claim=NoValue,
):
assert user not in {False, NoValue}
return make_principal([
('user', user),
('claim', claim),
])
Principals = _PrincipalFactory()
class _ACEFactory(_PrincipalFactory):
allow = Allow
deny = Deny
all_permissions = ALL_PERMISSIONS
ACE = _ACEFactory()
from pyramid.authorization import ACLAuthorizationPolicy
from myapp import services as S
from .resources import root_factory
log = __import__('logging').getLogger(__name__)
class AuthenticationPolicy:
def authenticated_userid(self, request):
return request.access_token.principal
def effective_principals(self, request):
return request.access_token.effective_principals
def get_access_token(request):
svc = request.find_service(S.LoginService)
return svc.identify_request(request)
def get_user(request):
user = request.access_token.user
if user:
log.info('request from user=%s', user.id)
return user
def add_principal(request, principal):
return request.access_token.effective_principals.add(principal)
def includeme(config):
auth_policy = AuthenticationPolicy()
authz_policy = ACLAuthorizationPolicy()
config.set_authentication_policy(auth_policy)
config.set_authorization_policy(authz_policy)
config.add_request_method(add_principal, 'add_principal')
config.add_request_method(get_access_token, 'access_token', reify=True)
config.add_request_method(get_user, 'user', reify=True)
config.set_root_factory(root_factory)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment