Skip to content

Instantly share code, notes, and snippets.

@vshjxyz
Last active January 4, 2016 21:59
Show Gist options
  • Save vshjxyz/8684755 to your computer and use it in GitHub Desktop.
Save vshjxyz/8684755 to your computer and use it in GitHub Desktop.
Code examples
from django.contrib.auth.models import User
from django.contrib.sessions.models import Session
from tastypie.authorization import Authorization
from tastypie.exceptions import Unauthorized
from django.db.models import Q
from apps.authorization.models import UserType
from apps.pos.models import Pos
class SubdomainAuthorization(Authorization):
'''
Implement the base authorization policies to see / unsee only the right resources
'''
subdomain_name_path = ''
customer_can_read = True
customer_can_write = False
def __init__(self, subdomain_name_path=None, customer_path=None, customer_can_read=True, customer_can_write=False):
self.subdomain_name_path = subdomain_name_path
self.customer_path = customer_path
self.customer_can_read = customer_can_read
self.customer_can_write = customer_can_write
def __build_subdomain_query(self, subdomain):
"""
Builds a subdomain query starting from the pos path and comparing it to the subdomain (parameter)
"""
return Q(**{
'%s__iexact' % self.subdomain_name_path: subdomain.name
})
def __can_access_resource(self, obj, subdomain):
"""
Starting from the obj, walks into his attributes to reach the subdomain name
and compare it to the subdomain.name (the given param)
"""
current_cursor = obj
if self.subdomain_name_path is not None or self.subdomain_name_path != '':
subdomain_name_path_splitted = self.subdomain_name_path.split('__')
for path_split in subdomain_name_path_splitted:
current_cursor = getattr(current_cursor, path_split)
return current_cursor == subdomain.name
def __can_customer_access_resource(self, obj, username):
"""
Starting from the obj, walks into his attributes to reach the username
and compare it to the given username
"""
current_cursor = obj
if self.customer_path is not None:
customer_path_splitted = self.customer_path.split('__')
for path_split in customer_path_splitted:
current_cursor = getattr(current_cursor, path_split)
return current_cursor.profile.user.username == username
def __check_detail_permissions(self, bundle, is_writing=False, is_reading=False, is_deleting=False):
"""
Checks if the user can access the bundle.obj from his subdomain and if the user is authenticated
"""
is_authorized = False
# The customer can't delete anything except for the addresses
if bundle.request.user.is_authenticated() and (bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER) and not is_deleting:
# If we are a customer, check if we can read or write
if self.customer_can_write and is_writing:
is_authorized = True
if self.customer_path is not None:
is_authorized = self._SubdomainAuthorization__can_customer_access_resource(bundle.obj, bundle.request.user.username)
elif is_reading:
is_authorized = self.customer_can_read
else:
# Checks if we are an allowed pos user or the pos owner
is_authorized = (
bundle.request.user.is_authenticated() and
bundle.request.user.profile.can_access_subdomain(bundle.request.subdomain)
)
if (bundle.obj is not None) and not is_reading:
# If we have an object (aka we are not creating it) then check that the subdomain is right
is_authorized = (
is_authorized and
self._SubdomainAuthorization__can_access_resource(bundle.obj, bundle.request.subdomain)
)
# The customer is deleting an address
if bundle.request.user.is_authenticated() and \
(bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER) and \
bundle.request.user.profile.customer.id == bundle.obj.customer_id and is_deleting:
is_authorized = True
if not is_authorized:
raise Unauthorized("Insufficent permissions.")
else:
return True
def read_list(self, object_list, bundle):
if self._SubdomainAuthorization__check_detail_permissions(bundle, is_reading=True):
filter = Q()
if not(bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER):
filter = self._SubdomainAuthorization__build_subdomain_query(bundle.request.subdomain)
return object_list.filter(filter)
else:
raise Unauthorized("Insufficent permissions.")
def read_detail(self, object_list, bundle):
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_reading=True)
def create_list(self, object_list, bundle):
# TODO: test the creation of a list and implement this authorization
raise Unauthorized("Insufficent permissions.")
def create_detail(self, object_list, bundle):
if hasattr(bundle.obj, 'pos_id'):
if not (bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER):
bundle.obj.pos = bundle.request.subdomain.application
else:
bundle.obj.pos = Pos.objects.get(id=bundle.data['pos_id'])
if hasattr(bundle.obj, 'customer_id') and (bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER):
bundle.obj.customer = bundle.request.user.profile.customer
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_writing=True)
def update_list(self, object_list, bundle):
raise Unauthorized("Insufficent permissions.")
def update_detail(self, object_list, bundle):
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_writing=True)
def delete_list(self, object_list, bundle):
raise Unauthorized("Insufficent permissions.")
def delete_detail(self, object_list, bundle):
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_deleting=True)
// The contents of individual model .js files will be concatenated into dist/models.js
"use strict";
(function() {
// Protects views where angular is not loaded from errors
if ( typeof angular == 'undefined' ) {
return;
}
var module = angular.module('CartModule', ['APIModule', 'AuthModule']);
module.service('CartService', function($q, APIService, AuthService) {
function Cart() {
this.init();
this.load();
}
Cart.prototype.init = function () {
this.cart = {
items: []
,pos: null
,total: 0
,delivery: false
,selectedAddress: null
};
return this;
};
/**
* Loads the cart from the localStorage
* @returns {Cart}
*/
Cart.prototype.load = function () {
var loadedCart = JSON.parse(window.localStorage.getItem('cart'));
if (loadedCart != null) {
this.cart = loadedCart;
}
return this;
};
/**
* Saves the cart to the localStorage
* @returns {Cart}
*/
Cart.prototype.save = function () {
window.localStorage.setItem('cart', JSON.stringify(this.cart));
return this;
};
/**
* Clear the cart and saves the cleared cart to the localStorage
* @returns {Cart|*|Session}
*/
Cart.prototype.clear = function () {
return this.init().save();
};
/**
* Adds an item to the cart and saves it to the localStorage with the right quantity. Returns the item that
* has just been added
* @param item
* @param price the current selected price for the item
* @returns {*|Mixed}
*/
Cart.prototype.add = function (item, price) {
if ((!_.isObject(item)) || (!_.isObject(price))) {
throw('You have to specify an item and a price if you want to add something to the CartService');
}
var foundItem = this.searchItem(item, price);
item.selectedPrice = price;
if (foundItem == null) {
item.quantity = 1;
this.cart.items.push(item);
} else {
item = foundItem;
item.quantity += 1;
}
this.calculateTotal();
this.save();
return item;
};
/**
* Removes an item from the cart and saves it to localStorage
* @param index
* @returns {Cart}
*/
Cart.prototype.remove = function (index) {
if ((index >= this.cart.items.length) && (this.cart.items.length > 0)) {
throw('You must provide an index to remove between 0 and ' + (this.cart.items.length - 1));
}
this.cart.items.splice(index, 1);
if (this.cart.items.length === 0) {
// We need to reset the pos
this.init();
} else {
this.calculateTotal();
}
this.save();
return this;
};
/**
* Searches an item from the current cart and returns it
* @param searchItem
* @param searchPrice optional parameter to check if the selectedPrice parameter of the item is equal to the searchPrice
* @returns {*|Mixed}
*/
Cart.prototype.searchItem = function (searchItem, searchPrice) {
return _.find(this.cart.items, function (item) {
var result = parseInt(item.id) === parseInt(searchItem.id);
if (result && (searchPrice != null)) {
result = result && (parseFloat(item.selectedPrice.value) === parseFloat(searchPrice.value));
}
return result;
});
};
/**
* Calculates the current total of the cart, based on the items that the cart has in it
*/
Cart.prototype.calculateTotal = function () {
var total = 0;
for (var i = 0; i < this.cart.items.length; i++) {
var item = this.cart.items[i];
total += parseInt(item.quantity) * parseFloat(item.selectedPrice.value);
}
if ((this.cart.delivery) && (this.cart.pos.delivery_price != null)) {
total += parseFloat(this.cart.pos.delivery_price);
}
// Round to 2 cyphers I.E.
// 2.236 * 100 = 223.6
// Math.round(223.6) = 224
// 224 / 100 = 2.24
this.cart.total = Math.round(total * 100) / 100;
return this;
};
/**
* This method perform the checkout of the cart and creates an order+orderlines on the server.
* It returns a promise that will be resolved if everything was fine
*
* @returns {*|Promise|Number|number|Deferred}
*/
Cart.prototype.checkout = function () {
var _this = this;
var deferred = $q.defer();
var user = AuthService.getUserData();
if (user != null) {
var orderData = {
'total_price': this.cart.total
,'delivery': this.cart.delivery
,'delivery_price': (this.cart.delivery ? this.cart.pos.delivery_price : 0)
,'pos_id': this.cart.pos.id
,'order_lines': []
// TODO: fix the status of the order
,'order_status': '/pos/pos/order_status/2/'
,'customer': '/pos/pos/customer/' + user.customer_id + '/'
,'customer_address': this.cart.selectedAddress
,'customer_phone': user.current_phone_number
,'notes': ''
,'customer_name': user.first_name + ' ' + user.last_name
};
APIService.all('order').post(orderData).then(function (response) {
var promises = [];
for (var i = 0; i < _this.cart.items.length; i++) {
var item = _this.cart.items[i];
var orderLineData = {
'repetitions': item.quantity
,'price': item.selectedPrice.value
,'price_category_name': item.selectedPrice.price_category.name
,'product': '/pos/pos/product/' + item.id + '/'
,'product_name': item.name
,'product_category_name': item.product_category.name
,'is_custom_product': false
,'order': '/pos/pos/order/' + response.id + '/'
,'addings': []
,'subtractions': []
,'orderline_position': i
};
promises.push(APIService.all('order_line').post(orderLineData));
}
$q.all(promises).then(function () {
_this.clear();
deferred.resolve();
}, function () {
deferred.reject('There was an error during the order\'s checkout (orderline)');
});
}, function () {
deferred.reject('There was an error during the order\'s checkout (order)');
});
} else {
deferred.reject('The user is not properly logged in');
}
return deferred.promise;
};
return new Cart();
});
})();
"use strict";
describe('CartService tests', function () {
var $scope
,$httpBackend
,CartService
,AuthService
,BASEURL = 'http://pizzanuvola.com:8000'
,testCartData = {
foo: 'bar'
,foo2: 'bar2'
,delivery: true
,pos: {
id: 2
,name: 'test'
,address: 'test addr'
,delivery_price: 2.22
}
,items: [
{
id: 2
,foo3: 'bar3'
,quantity: 1
,selectedPrice: {
id: 4
,price_category: {
name: 'test'
}
,value: "6.66"
}
,product_category: {
name: 'product cat test'
}
}
,{
id: 5
,foo3: 'bar4'
,quantity: 3
,selectedPrice: {
id: 3
,price_category: {
name: 'test2'
}
,value: "4.44"
}
,product_category: {
name: 'product cat test2'
}
}
,{
id: 6
,foo3: 'bar5'
,quantity: 2
,selectedPrice: {
id: 1
,price_category: {
name: 'test3'
}
,value: "12.50"
}
,product_category: {
name: 'product cat test3'
}
}
]
}
;
beforeEach(module('pnApp'));
beforeEach(function () {
AuthService = {
getUserData: function () {
return {};
}
};
angular.mock.module(function ($provide) {
$provide.value('AuthService', AuthService);
});
});
beforeEach(inject(function ($rootScope, _$httpBackend_, _CartService_) {
$scope = $rootScope.$new();
$httpBackend = _$httpBackend_;
CartService = _CartService_;
}));
afterEach(function () {
// Clearing the localstorage after each test
window.localStorage.clear();
});
it('Should load the cart properly from the localStorage', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
var cart = CartService.load().cart;
expect(cart.foo).toBe('bar');
expect(cart.foo2).toBe('bar2');
expect(cart.items.length).toBe(3);
expect(cart.items[0].id).toBe(2);
expect(cart.items[1].id).toBe(5);
});
it('Should add correctly an item to the cart, saving it to localStorage aswell', function () {
var cart = CartService.load().cart;
expect(cart.items.length).toBe(0);
CartService.add({
id: 7
,foo: 'bar'
}
,{
id: 1
,value: "6.55"
});
// We manually load the localStorage data to check that the modifications has been saved
var savedCart = JSON.parse(window.localStorage.getItem('cart'));
expect(savedCart.items.length).toBe(1);
expect(savedCart.items[0].id).toBe(7);
});
it('Should update the quantity of the cart if I add items with the same id, but be aware of the price', function () {
var cart = CartService.load().cart;
expect(cart.items.length).toBe(0);
// We need to use functions in this test because multiple CartService.add calls alters the first parameter structure
var getProduct = function () {
return {
id: 7
,foo: 'bar'
};
};
var getPrice1 = function () {
return {
id: 6
,value: "5.55"
};
};
var getPrice2 = function () {
return {
id: 3
,value: "6"
};
};
CartService.add(getProduct(), getPrice1());
expect(CartService.searchItem(getProduct(), getPrice1()).quantity).toBe(1);
CartService.add(getProduct(), getPrice1());
expect(CartService.searchItem(getProduct(), getPrice1()).quantity).toBe(2);
CartService.add(getProduct(), getPrice2());
expect(CartService.searchItem(getProduct(), getPrice2()).quantity).toBe(1);
expect(CartService.searchItem(getProduct(), getPrice1()).quantity).toBe(2);
});
it('Should remove an item from the service and the localStorage', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
var cart = CartService.load().cart;
var oldLength = cart.items.length;
CartService.remove(0);
// It should have removed an item
expect(cart.items.length).toBe(oldLength - 1);
CartService.remove(0);
// It should have removed another item
expect(cart.items.length).toBe(oldLength - 2);
expect(function() {
CartService.remove(99);
}).toThrow();
});
it('Should clear the cart and the localStorage', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
CartService.load();
expect(CartService.cart.items.length).toBe(3);
CartService.clear();
expect(CartService.cart.items.length).toBe(0);
// We manually load the localStorage data to check that the modifications has been saved
var savedCart = JSON.parse(window.localStorage.getItem('cart'));
expect(savedCart.items.length).toBe(0);
});
it('Should properly calculate the total based on the items', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
CartService.load();
CartService.calculateTotal();
expect(CartService.cart.total).toBe(47.2);
});
it('Should reset the pos if we delete all the products', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
CartService.load();
expect(CartService.cart.pos).not.toBeNull();
var cartLength = CartService.cart.items.length;
for (var i = 0; i < cartLength; i++) {
CartService.remove(0);
}
expect(CartService.cart.items.length).toBe(0);
expect(CartService.cart.pos).toBeNull();
});
it('Should reject the promise when we try to checkout an order without user data', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
var spyMethodSuccess
,spyMethodFailure
;
CartService.load();
spyOn(AuthService, 'getUserData').andReturn(null);
spyMethodSuccess = jasmine.createSpy();
spyMethodFailure = jasmine.createSpy();
CartService.checkout().then(spyMethodSuccess, spyMethodFailure);
// This apply releases the promises so they go into the 'then' method(s)
$scope.$apply();
expect(spyMethodSuccess).not.toHaveBeenCalled();
expect(spyMethodFailure).toHaveBeenCalledWith('The user is not properly logged in');
});
it('Should reject the promise when we something went bad on the order POST when trying to checkout', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
var spyMethodSuccess
,spyMethodFailure
;
CartService.load();
spyOn(AuthService, 'getUserData').andReturn('something');
spyMethodSuccess = jasmine.createSpy();
spyMethodFailure = jasmine.createSpy();
$httpBackend
.when('POST', BASEURL + '/pos/pos/order/')
.respond(function(method, url, data) {
return [500, {}];
});
CartService.checkout().then(spyMethodSuccess, spyMethodFailure);
// This apply releases the promises so they go into the 'then' method(s)
$scope.$apply();
$httpBackend.flush();
expect(spyMethodSuccess).not.toHaveBeenCalled();
expect(spyMethodFailure).toHaveBeenCalledWith('There was an error during the order\'s checkout (order)');
});
it('Should reject the promise when we something went bad on the orderLine POST when trying to checkout', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
var spyMethodSuccess
,spyMethodFailure
;
CartService.load();
spyOn(AuthService, 'getUserData').andReturn('something');
spyMethodSuccess = jasmine.createSpy();
spyMethodFailure = jasmine.createSpy();
$httpBackend
.when('POST', BASEURL + '/pos/pos/order/')
.respond(function(method, url, data) {
return [200, {}];
});
$httpBackend
.when('POST', BASEURL + '/pos/pos/order_line/')
.respond(function(method, url, data) {
return [500, {}];
});
CartService.checkout().then(spyMethodSuccess, spyMethodFailure);
// This apply releases the promises so they go into the 'then' method(s)
$scope.$apply();
$httpBackend.flush();
expect(spyMethodSuccess).not.toHaveBeenCalled();
expect(spyMethodFailure).toHaveBeenCalledWith('There was an error during the order\'s checkout (orderline)');
});
it('Should resolve the promise if the checkout process went fine', function () {
window.localStorage.setItem('cart', JSON.stringify(testCartData));
var spyMethodSuccess
,spyMethodFailure
;
CartService.load();
spyOn(AuthService, 'getUserData').andReturn('something');
spyMethodSuccess = jasmine.createSpy();
spyMethodFailure = jasmine.createSpy();
$httpBackend
.when('POST', BASEURL + '/pos/pos/order/')
.respond(function(method, url, data) {
return [200, {}];
});
$httpBackend
.when('POST', BASEURL + '/pos/pos/order_line/')
.respond(function(method, url, data) {
return [200, {}];
});
CartService.checkout().then(spyMethodSuccess, spyMethodFailure);
// This apply releases the promises so they go into the 'then' method(s)
$scope.$apply();
$httpBackend.flush();
expect(spyMethodSuccess).toHaveBeenCalled();
expect(spyMethodFailure).not.toHaveBeenCalled();
});
});
from django.contrib.auth.models import User
from django.test import TestCase, RequestFactory
import sys
from tastypie.bundle import Bundle
from tastypie.exceptions import ImmediateHttpResponse
from apps.authorization.models import *
from apps.authorization.middleware import SubdomainMiddleware
from apps.pos.models import *
from apps.pos.api import *
from apps.pos.apifull import *
from django.test.client import Client
from apps.main.ajax import app_form_submit
import json
class AuthTests(TestCase):
fixtures = [
"sites_fixture.json",
"test_fixture.json"
]
# This dict indicates the permission per pos owner expressed in the order:
# RESOURCE: [CAN_READ, CAN_WRITE, CAN_DELETE]
CAN_READ_ID = 0
CAN_WRITE_ID = 1
CAN_DELETE_ID = 2
pos_owner_permissions = {
'adding' : [True, True, True],
'product' : [True, True, True],
'order_line' : [True, True, True],
'product_category' : [True, True, True],
'price' : [True, True, True],
'pos' : [True, True, True],
'note' : [True, True, True],
'setting' : [True, True, True],
'price_category' : [True, True, True],
'poslist' : [True, False, False],
'address' : [True, True, True],
'order_status' : [True, False, False],
'kinship' : [True, True, True],
'phone_number' : [True, True, True],
'order' : [True, True, True],
'subtraction' : [True, True, True],
'business_hour' : [True, True, True],
'call' : [True, True, True],
'customerfull' : [True, True, True],
'orderpartial' : [True, True, True],
'product_categoryfull' : [True, True, True],
'kinshipfull' : [True, True, True],
'productfullingredients': [True, True, True],
'orderdetailfull' : [True, True, True],
'pricefull' : [True, True, True],
'orderfull' : [True, True, True],
'productfull' : [True, True, True],
}
pos_owner2_permissions = {
'adding' : [False, False, False],
'product' : [False, False, False],
'order_line' : [False, False, False],
'product_category' : [False, False, False],
'price' : [False, False, False],
'pos' : [False, False, False],
'note' : [False, False, False],
'setting' : [False, False, False],
'price_category' : [False, False, False],
'poslist' : [True, False, False],
'address' : [False, False, False],
'order_status' : [True, False, False],
'kinship' : [False, False, False],
'phone_number' : [False, False, False],
'order' : [False, False, False],
'subtraction' : [False, False, False],
'business_hour' : [False, False, False],
'call' : [False, False, False],
'customerfull' : [False, False, False],
'orderpartial' : [False, False, False],
'product_categoryfull' : [False, False, False],
'kinshipfull' : [False, False, False],
'productfullingredients': [False, False, False],
'orderdetailfull' : [False, False, False],
'pricefull' : [False, False, False],
'orderfull' : [False, False, False],
'productfull' : [False, False, False],
}
pos_enduser_permissions = {
'adding' : [True, False, False],
'product' : [True, False, False],
'order_line' : [True, True, False],
'product_category' : [True, False, False],
'price' : [True, False, False],
'pos' : [True, False, False],
'note' : [True, False, False],
'setting' : [False, False, False],
'price_category' : [True, False, False],
'poslist' : [True, False, False],
'address' : [True, True, False],
'order_status' : [True, False, False],
'kinship' : [True, False, False],
'phone_number' : [True, False, False],
'order' : [True, True, False],
'subtraction' : [True, False, False],
'business_hour' : [True, False, False],
'call' : [False, False, False],
'customerfull' : [True, False, False],
'orderpartial' : [True, False, False],
'product_categoryfull' : [True, False, False],
'kinshipfull' : [True, False, False],
'productfullingredients': [True, False, False],
'orderdetailfull' : [True, False, False],
'pricefull' : [True, False, False],
'orderfull' : [True, False, False],
'productfull' : [True, False, False],
}
def setUp(self):
self.factory = RequestFactory()
self.client = Client()
self.user_posowner = User.objects.get(username='testposowner')
self.user_posowner2 = User.objects.get(username='testposowner2')
self.user_enduser = User.objects.get(username='testenduser')
# self.user_posuser = User.objects.get(username='testposuser')
self.posowner1_pos_name = "provaposowner"
self.posowner2_pos_name = "provaposowner2"
self.posowner1_pos = Pos.objects.get(
subdomain__name=self.posowner1_pos_name,
name=self.posowner1_pos_name
)
self.posowner2_pos = Pos.objects.get(
subdomain__name=self.posowner2_pos_name,
name=self.posowner2_pos_name
)
def process_request(self, request):
"""
Makes the request parameter to be processed by the SubdomainMiddleware
"""
return SubdomainMiddleware().process_request(request)
def check_permission(self, user, pos, operation_id, resource_name, resource):
"""
This method performs different operations due to his parameters and it can
* simulate a READ from the tastypie API
* simulate a WRITE from the tastypie API
* simulate a DELETE from the tastypie API
in order to do this, it creates a fake request and a fake bundle with that request
it then calls the native Tastypie methods that calls the relative authorization and returns True or False if the operation succeed
"""
if operation_id == self.CAN_READ_ID:
tmp_request = self.build_get_request_api(user, pos.name, resource_name)
self.process_request(tmp_request)
# Getting all the objects for this resource
tmp_bundle = Bundle(request=tmp_request)
try:
object_list = resource.obj_get_list(tmp_bundle)
if len(object_list) > 0:
object_item = object_list[0]
# Building a fake get request
tmp_request = self.build_get_request_api(user, pos.name, resource_name, object_item)
self.process_request(tmp_request)
# Getting a single objects to check the other method in the authorization
tmp_bundle = Bundle(request=tmp_request, obj=object_item)
object_item = resource.obj_get(tmp_bundle, pk=object_item.id)
except ImmediateHttpResponse, e:
# If the status code of the exception is 401 (Unauthorized) then return False, else bubble up the exception
if e.response.status_code == 401:
return False
else:
raise e
return True
elif operation_id == self.CAN_DELETE_ID:
tmp_request = self.build_delete_request_api(user, pos.name, resource_name)
self.process_request(tmp_request)
# Getting all the objects for this resource
tmp_bundle = Bundle(request=tmp_request)
try:
object_list = resource.obj_get_list(tmp_bundle)
if len(object_list) > 0:
object_item = object_list[0]
# Building a fake delete request
tmp_request = self.build_delete_request_api(user, pos.name, resource_name, object_item)
self.process_request(tmp_request)
# Getting a single objects to check the other method in the authorization
tmp_bundle = Bundle(request=tmp_request, obj=object_item)
object_item = resource.obj_delete(tmp_bundle, pk=object_item.id)
except ImmediateHttpResponse, e:
# If the status code of the exception is 401 (Unauthorized) then return False, else bubble up the exception
if e.response.status_code == 401:
return False
else:
raise e
return True
elif operation_id == self.CAN_WRITE_ID:
tmp_request = self.build_create_request_api(user, pos.name, resource_name)
self.process_request(tmp_request)
# Getting all the objects for this resource
tmp_bundle = Bundle(request=tmp_request)
try:
if hasattr(resource.Meta.object_class, 'SUBDOMAIN_NAME_PATH'):
# I filter the default queryset with the right objects
fields_to_query = {}
fields_to_query['%s__iexact' % resource.Meta.object_class.SUBDOMAIN_NAME_PATH] = self.user_posowner.profile.pos_applications.select_related('pos_applications')[0].subdomain.name
resource_queryset = resource.Meta.queryset.filter(Q(**fields_to_query))
else:
resource_queryset = resource.Meta.queryset
# If the queryset is empty, we cannot test the creation (because we base the verification duplicating an object)
if len(resource_queryset) is 0:
print "Skipping the creation test for the resource %s (queryset is empty)" % resource_name
return None
object_item = resource_queryset[0]
if user.profile.user_type_id == UserType.USER_TYPE_ENDUSER:
# If the user trying to test the creation is an enduser, we go inside the customer path and we force the customer
# of the resource to be the user's customer, so the authorization can do a proper check
if resource.Meta.object_class.CUSTOMER_PATH is None:
return False
else:
current_cursor = object_item
customer_path_splitted = resource.Meta.object_class.CUSTOMER_PATH.split('__')
customer_attribute_name = customer_path_splitted.pop()
for path_split in customer_path_splitted:
current_cursor = getattr(current_cursor, path_split)
setattr(current_cursor, customer_attribute_name, user.profile.customer)
# Building a fake create request
tmp_request = self.build_create_request_api(user, pos.name, resource_name)
self.process_request(tmp_request)
# Getting a single objects to check the other method in the authorization
object_item.pk = None
tmp_bundle = Bundle(request=tmp_request, obj=object_item, data={'pos_id': pos.id})
resource.authorized_create_detail(resource.get_object_list(tmp_bundle.request), tmp_bundle)
except ImmediateHttpResponse, e:
# If the status code of the exception is 401 (Unauthorized) then return False, else bubble up the exception
if e.response.status_code == 401:
return False
else:
raise e
return True
def build_get_request_api(self, user, subdomain, resource_name, resource=None):
url = "/pos/pos/%s" % resource_name
if resource is not None:
url += '/%s/' % resource.id
request = self.factory.get(
url,
HTTP_HOST="%s.pizzanuvola.com" % subdomain
)
request.user = user
return request
def build_delete_request_api(self, user, subdomain, resource_name, resource=None):
url = "/pos/pos/%s" % resource_name
if resource is not None:
url += '/%s/' % resource.id
request = self.factory.delete(
url,
HTTP_HOST="%s.pizzanuvola.com" % subdomain
)
request.user = user
return request
def build_create_request_api(self, user, subdomain, resource_name, resource=None):
url = "/pos/pos/%s" % resource_name
if resource is not None:
url += '/%s/' % resource.id
request = self.factory.post(
url,
HTTP_HOST="%s.pizzanuvola.com" % subdomain
)
request.user = user
return request
def test_basic_login_as_posowner(self):
"""
Tests the basic creation of a pos owner user and login
"""
response = self.client.get(
'/dashboard/',
HTTP_HOST='pizzanuvola.com'
)
self.assertEqual(response.status_code, 302)
self.assertEqual('http://pizzanuvola.com/accounts/login/?next=/dashboard/', response._headers['location'][1])
self.client.login(username='testposowner', password='test')
response = self.client.get(
'/',
HTTP_HOST='pizzanuvola.com'
)
self.assertEqual(response.status_code, 302)
self.assertEqual('http://pizzanuvola.com/dashboard', response._headers['location'][1])
def test_basic_login_as_enduser(self):
"""
Tests the basic creation of a end user and login
"""
self.client.login(username='testenduser', password='test')
response = self.client.get(
'/dashboard/',
HTTP_HOST='pizzanuvola.com'
)
self.assertEqual(response.status_code, 403)
def test_pos_creation_permissions(self):
"""
Tests if the permission check about the creation of a pos is working
"""
def send_pos_create_ajax_request(pos_name, user):
pos_data = {
"subdomain_name": pos_name,
"name": pos_name
}
serialized_data = []
for key, value in pos_data.iteritems():
serialized_data.append("%s=%s" % (key,value))
serialized_data = '&'.join(serialized_data)
request = self.factory.post(
'/',
data=pos_data,
HTTP_HOST='pizzanuvola.com'
)
request.user = user
request.session = {}
json_response = app_form_submit(request, serialized_data)
return json.loads(json_response)
# We send a fake ajax request to the ajax view that handles the creation of the pos
# macking it belive that we are a pos owner
response = send_pos_create_ajax_request("testposowner", self.user_posowner)
self.assertEqual(response.get('success', False), True)
created_pos_query = Pos.objects.filter(name='testposowner')
self.assertEqual(len(created_pos_query), 1)
# We check that we should not be able to create two pos with the same name
response = send_pos_create_ajax_request("testposowner", self.user_posowner)
self.assertEqual(response.get('success', False), False)
created_pos_query = Pos.objects.filter(name='testposowner')
self.assertEqual(len(created_pos_query), 1)
# if we are an end user we should not be able to create it
response = send_pos_create_ajax_request("testposenduser", self.user_enduser)
self.assertEqual(response.get('success', False), False)
created_pos_query = Pos.objects.filter(name='testposenduser')
self.assertEqual(len(created_pos_query), 0)
def test_tastypie_subdomainauthentication(self):
"""
Tests the tastypie subdomainauthentication class to be working properly
TODO: setup permissions for the posuser
"""
def check_tastypie_resource(resource_name, resource):
if not isinstance(resource._meta.authorization, SubdomainAuthorization):
print "Skipping the check for the resource %s (authorization class: %s)" % (resource_name, resource._meta.authorization)
return
print "Checking the permissions for the '%s' resource..." % resource_name
tmp_request = self.build_get_request_api(self.user_posowner, self.posowner1_pos_name, resource_name)
self.process_request(tmp_request)
# Getting all the objects for this resource
tmp_bundle = Bundle(request=tmp_request)
object_list = resource.obj_get_list(tmp_bundle)
item_check_counter = 0
for item in object_list:
current_cursor = item
if item.POS_PATH is not None and item.POS_PATH != '':
pos_path_splitted = item.POS_PATH.split('__')
for path_split in pos_path_splitted:
current_cursor = getattr(current_cursor, path_split)
self.assertEqual(current_cursor, self.posowner1_pos)
item_check_counter += 1
print "OK (%s item checked)\n" % item_check_counter
# We check that the posowner can't see the 'things'
for resource_name, resource in pos_api._registry.iteritems():
check_tastypie_resource(resource_name, resource)
for resource_name, resource in pos_apifull._registry.iteritems():
check_tastypie_resource(resource_name, resource)
def test_pos_owner_read_permissions(self):
"""
This test do a detailed check for the read capabilities of the owner
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_owner_permissions.keys():
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
# TESTING READ CAPABILITIES -----------------------------------
check_result = self.check_permission(self.user_posowner, self.posowner1_pos, self.CAN_READ_ID, resource_name, resource)
self.assertEqual(
check_result,
self.pos_owner_permissions[resource_name][self.CAN_READ_ID],
msg="The user '%s' was %s to READ the '%s' resource of the pos '%s'" % (self.user_posowner, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
check_result = self.check_permission(self.user_posowner2, self.posowner1_pos, self.CAN_READ_ID, resource_name, resource)
self.assertEqual(
check_result,
self.pos_owner2_permissions[resource_name][self.CAN_READ_ID],
msg="The user '%s' was %s to READ the '%s' resource of the pos '%s'" % (self.user_posowner2, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
def test_enduser_read_permissions(self):
"""
This test do a detailed check for the read capabilities of the enduser
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_owner_permissions.keys():
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
check_result = self.check_permission(self.user_enduser, self.posowner1_pos, self.CAN_READ_ID, resource_name, resource)
self.assertEqual(
check_result,
self.pos_enduser_permissions[resource_name][self.CAN_READ_ID],
msg="The user '%s' was %s to READ the '%s' resource of the pos '%s'" % (self.user_enduser, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
def test_posowner_delete_permissions(self):
"""
This test do a detailed check for the delete capabilities of the posowner
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_owner_permissions.keys():
sid = transaction.savepoint()
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
check_result = self.check_permission(self.user_posowner, self.posowner1_pos, self.CAN_DELETE_ID, resource_name, resource)
self.assertEqual(
check_result,
self.pos_owner_permissions[resource_name][self.CAN_DELETE_ID],
msg="The user '%s' was %s to DELETE the '%s' resource of the pos '%s'" % (self.user_posowner, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
transaction.savepoint_rollback(sid)
def test_posowner2_delete_permissions(self):
"""
This test do a detailed check for the delete capabilities of the posowner2
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_owner_permissions.keys():
sid = transaction.savepoint()
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
check_result = self.check_permission(self.user_posowner2, self.posowner1_pos, self.CAN_DELETE_ID, resource_name, resource)
self.assertEqual(
check_result,
self.pos_owner2_permissions[resource_name][self.CAN_DELETE_ID],
msg="The user '%s' was %s to DELETE the '%s' resource of the pos '%s'" % (self.user_posowner2, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
transaction.savepoint_rollback(sid)
def test_enduser_delete_permissions(self):
"""
This test do a detailed check for the delete capabilities of the enduser
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_owner_permissions.keys():
sid = transaction.savepoint()
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
check_result = self.check_permission(self.user_enduser, self.posowner1_pos, self.CAN_DELETE_ID, resource_name, resource)
self.assertEqual(
check_result,
self.pos_enduser_permissions[resource_name][self.CAN_DELETE_ID],
msg="The user '%s' was %s to DELETE the '%s' resource of the pos '%s'" % (self.user_enduser, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
transaction.savepoint_rollback(sid)
def test_posowner_create_permissions(self):
"""
This test do a detailed check for the write capabilities of the posowner
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_owner_permissions.keys():
sid = transaction.savepoint()
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
check_result = self.check_permission(self.user_posowner, self.posowner1_pos, self.CAN_WRITE_ID, resource_name, resource)
if check_result is not None:
self.assertEqual(
check_result,
self.pos_owner_permissions[resource_name][self.CAN_WRITE_ID],
msg="The user '%s' was %s to CREATE the '%s' resource of the pos '%s'" % (self.user_posowner, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
transaction.savepoint_rollback(sid)
def test_posowner2_create_permissions(self):
"""
This test do a detailed check for the write capabilities of the posowner2
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_owner2_permissions.keys():
sid = transaction.savepoint()
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
check_result = self.check_permission(self.user_posowner2, self.posowner1_pos, self.CAN_WRITE_ID, resource_name, resource)
if check_result is not None:
self.assertEqual(
check_result,
self.pos_owner2_permissions[resource_name][self.CAN_WRITE_ID],
msg="The user '%s' was %s to CREATE the '%s' resource of the pos '%s'" % (self.user_posowner2, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
transaction.savepoint_rollback(sid)
def test_enduser_create_permissions(self):
"""
This test do a detailed check for the write capabilities of the enduser
"""
for resource_name, resource in pos_api._registry.iteritems():
if resource_name in self.pos_enduser_permissions.keys():
sid = transaction.savepoint()
def get_able_or_unable(check_result):
if check_result:
return 'able'
else:
return 'unable'
check_result = self.check_permission(self.user_enduser, self.posowner1_pos, self.CAN_WRITE_ID, resource_name, resource)
if check_result is not None:
self.assertEqual(
check_result,
self.pos_enduser_permissions[resource_name][self.CAN_WRITE_ID],
msg="The user '%s' was %s to CREATE the '%s' resource of the pos '%s'" % (self.user_enduser, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name)
)
transaction.savepoint_rollback(sid)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment