Skip to content

Instantly share code, notes, and snippets.

@airtonix
Last active December 16, 2015 17:58
Show Gist options
  • Save airtonix/5473873 to your computer and use it in GitHub Desktop.
Save airtonix/5473873 to your computer and use it in GitHub Desktop.
An authorization class for dango-tastypie that limits queryset results to those whose designated field points at the current user or when the designated field is None, when the User instance is the same as the request.user
from tastypie.authorization import DjangoAuthorization, Authorization
class UserObjectsOnlyAuthorization(Authorization):
"""
:user_field: specify object owner field on the model as `user_field`
class Something(models.Model):
the_owner = models.ForeignKey('auth.User')
class UserResource(ModelResource):
model = Something
authorization = UserObjectsOnlyAuthorization(user_field='the_owner')
or
from django.contrib.auth.models import User
class UserResource(ModelResource):
class Meta:
queryset = User.objects.all()
authorization = UserObjectsOnlyAuthorization()
"""
def __init__(self, *args, **kwargs):
self.user_field = kwargs.pop("user_field", None)
super(UserObjectsOnlyAuthorization, self).__init__(*args, **kwargs)
def read_list(self, object_list, bundle):
if not self.user_field == None:
return object_list.filter(pk=bundle.request.user.id)
return object_list.filter(**{self.user_field: bundle.request.user})
def read_detail(self, object_list, bundle):
if not self.user_field == None:
return object_list.filter(pk=bundle.request.user.id)
# Is the requested object owned by the user?
return getattr(bundle.obj, self.user_field) == bundle.request.user
def create_list(self, object_list, bundle):
if not self.user_field == None:
return object_list.filter(pk=bundle.request.user.id)
# Assuming their auto-assigned to ``user``.
return object_list
def create_detail(self, object_list, bundle):
if not self.user_field == None:
return object_list.filter(username=bundle.obj.username).exists()
return getattr(bundle.obj, self.user_field) == bundle.request.user
def update_list(self, object_list, bundle):
if not self.user_field == None:
return object_list.filter(pk=bundle.request.user.id)
allowed = []
# Since they may not all be saved, iterate over them.
for obj in object_list:
if getattr(obj, self.user_field) == bundle.request.user:
allowed.append(obj)
return allowed
def update_detail(self, object_list, bundle):
if not self.user_field == None:
return bundle.obj.pk == bundle.request.user.id
return getattr(bundle.obj, self.user_field) == bundle.request.user
def delete_list(self, object_list, bundle):
if not self.user_field == None:
return object_list.filter(pk=bundle.request.user.id)
allowed = []
# Since they may not all be saved, iterate over them.
for obj in object_list:
if getattr(obj, self.user_field) == bundle.request.user:
allowed.append(obj)
return allowed
def delete_detail(self, object_list, bundle):
if not self.user_field == None:
return bundle.obj.pk == bundle.request.user.id
return getattr(bundle.obj, self.user_field) == bundle.request.user
from django.contrib.auth.models import User
from tastypie.utils import is_valid_jsonp_callback_value, dict_strip_unicode_keys, trailing_slash
from surlex.dj import surl
from .authorization import UserObjectsOnlyAuthorization
class UserResource(ModelResource):
class Meta:
queryset = User.objects.all()
allowed_methods = ['post']
resource_name = 'user'
authentication = ApiKeyAuthentication()
authorization = UserObjectsOnlyAuthorization()
urlargs = {'name':resource_name, 'slash':trailing_slash()}
def prepend_urls(self):
return [
surl(r"^<resource_name=[{name}]>/<pk:#>/login{slash}$".format(**self._meta.urlargs), self.wrap_view('start_server'), name="api_user_login"),
surl(r"^<resource_name=[{name}]>/<pk:#>/logout{slash}$".format(**self._meta.urlargs), self.wrap_view('stop_server'), name="api_user_logout"),
]
def login(self, bundle, **kwargs):
pass
def logout(self, bundle, **kwargs):
pass
def obj_create(self, bundle, **kwargs):
try:
bundle = super(UserResource, self).obj_create(bundle, **kwargs)
bundle.obj.set_password(bundle.data.get('password'))
bundle.obj.save()
except IntegrityError:
raise BadRequest(_("A user with that username already exists."))
return bundle
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment