Skip to content

Instantly share code, notes, and snippets.

@merwok
Forked from hadrien/doc.rst
Last active August 29, 2015 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save merwok/ac94667b0cd208f1cf64 to your computer and use it in GitHub Desktop.
Save merwok/ac94667b0cd208f1cf64 to your computer and use it in GitHub Desktop.

ACL in pyramid

Dynamic ACLs

Here is a good example: http://stackoverflow.com/questions/5761617/pyramid-authorization-for-stored-items/5761901#5761901

Dynamic group finding

This is the way I deal with ACL and reflect solely my opinion.

It is an answer to one conversation on twitter

Rather than doing dynamic ACL which can be quite hard to read and maintain IMHO, I defined roles which are allowed permissions on resource.

For example, in a sticky notes as a service api, everyone can access Bob's notes. But only the owner must be able to change it.

In order to authenticate a user as an owner, resource define a group_finder method which the auth policy calls to extend principals.

Below example can be run directly and then:

  • curl -XPUT -H 'X-DUMMY-AUTH-USERID: bob' http://localhost:8080/users/bob/notes/456 will succeed
  • curl -XPUT -H 'X-DUMMY-AUTH-USERID: not_bob' http://localhost:8080/users/bob/notes/456 will return 403
from wsgiref.simple_server import make_server
from pyramid.config import Configurator
from pyramid.decorator import reify
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.authentication import CallbackAuthenticationPolicy
from pyramid.view import view_config, view_defaults
from pyramid.security import (
ALL_PERMISSIONS,
Allow,
unauthenticated_userid,
Everyone,
)
from pyramid.traversal import find_interface
def main(global_config, **settings):
config = Configurator(settings=settings)
config.include(__name__)
return config.make_wsgi_app()
def includeme(config):
config.set_root_factory(Root)
config.set_authentication_policy(AuthenticationPolicy())
config.set_authorization_policy(ACLAuthorizationPolicy())
config.scan()
class AuthenticationPolicy(CallbackAuthenticationPolicy):
def unauthenticated_userid(self, request):
userid = None
if 'X-DUMMY-AUTH-USERID' in request.headers:
userid = request.headers.get('X-DUMMY-AUTH-USERID')
return userid
def callback(self, userid, request):
principals = []
context = request.context
if hasattr(context, 'group_finder'):
principals.extend(context.group_finder(request))
return principals
# /
class Root(object):
def __init__(self, request):
self.request = request
self.__name__ = ''
self.__parent__ = None
def __getitem__(self, key):
return UsersCollection(key, self)
# /users
class UsersCollection(object):
def __init__(self, name, parent):
self.__name__ = name
self.__parent__ = parent
def __getitem__(self, key):
return User(key, self)
# /users/bob_marley/
class User(object):
def __init__(self, name, parent):
self.__name__ = name
self.__parent__ = parent
def __getitem__(self, key):
return NotesCollection(key, self)
# /users/bob_marley/notes/
class NotesCollection(object):
__acl__ = [
(Allow, Everyone, 'show'),
(Allow, 'group:owner', ALL_PERMISSIONS),
]
def __init__(self, name, parent):
self.__name__ = name
self.__parent__ = parent
def __getitem__(self, key):
return Note(key, self)
# /users/bob_marley/notes/456
class Note(object):
def __init__(self, name, parent):
self.__name__ = name
self.__parent__ = parent
@reify
def user(self):
return find_interface(self, User)
# same as self.__parent__.__parent__
@property
def userid(self):
# /users/123/notes/456 will return 123
return self.user.__name__
def group_finder(self, request):
principals = []
if unauthenticated_userid(request) == self.user.__name__:
principals.append('group:owner')
return principals
def replace(self, params):
"save in db... Only owners can"
return {'status': '200'}
def show(self):
"get from db"
return {
'id': self.__name__,
'content': 'A dummy note',
}
@view_defaults(context=Note)
class View(object):
def __init__(self, context, request):
self.request = request
self.context = context
@view_config(request_method='GET', permission='show', renderer='json')
def show(self):
return self.context.show()
@view_config(request_method='PUT', permission='replace', renderer='json')
def replace(self):
return self.context.replace(self.request.POST)
if __name__ == '__main__':
server = make_server('0.0.0.0', 8080, main({}))
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment