Last active
January 4, 2016 21:59
-
-
Save vshjxyz/8684755 to your computer and use it in GitHub Desktop.
Code examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.contrib.auth.models import User | |
from django.contrib.sessions.models import Session | |
from tastypie.authorization import Authorization | |
from tastypie.exceptions import Unauthorized | |
from django.db.models import Q | |
from apps.authorization.models import UserType | |
from apps.pos.models import Pos | |
class SubdomainAuthorization(Authorization): | |
''' | |
Implement the base authorization policies to see / unsee only the right resources | |
''' | |
subdomain_name_path = '' | |
customer_can_read = True | |
customer_can_write = False | |
def __init__(self, subdomain_name_path=None, customer_path=None, customer_can_read=True, customer_can_write=False): | |
self.subdomain_name_path = subdomain_name_path | |
self.customer_path = customer_path | |
self.customer_can_read = customer_can_read | |
self.customer_can_write = customer_can_write | |
def __build_subdomain_query(self, subdomain): | |
""" | |
Builds a subdomain query starting from the pos path and comparing it to the subdomain (parameter) | |
""" | |
return Q(**{ | |
'%s__iexact' % self.subdomain_name_path: subdomain.name | |
}) | |
def __can_access_resource(self, obj, subdomain): | |
""" | |
Starting from the obj, walks into his attributes to reach the subdomain name | |
and compare it to the subdomain.name (the given param) | |
""" | |
current_cursor = obj | |
if self.subdomain_name_path is not None or self.subdomain_name_path != '': | |
subdomain_name_path_splitted = self.subdomain_name_path.split('__') | |
for path_split in subdomain_name_path_splitted: | |
current_cursor = getattr(current_cursor, path_split) | |
return current_cursor == subdomain.name | |
def __can_customer_access_resource(self, obj, username): | |
""" | |
Starting from the obj, walks into his attributes to reach the username | |
and compare it to the given username | |
""" | |
current_cursor = obj | |
if self.customer_path is not None: | |
customer_path_splitted = self.customer_path.split('__') | |
for path_split in customer_path_splitted: | |
current_cursor = getattr(current_cursor, path_split) | |
return current_cursor.profile.user.username == username | |
def __check_detail_permissions(self, bundle, is_writing=False, is_reading=False, is_deleting=False): | |
""" | |
Checks if the user can access the bundle.obj from his subdomain and if the user is authenticated | |
""" | |
is_authorized = False | |
# The customer can't delete anything except for the addresses | |
if bundle.request.user.is_authenticated() and (bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER) and not is_deleting: | |
# If we are a customer, check if we can read or write | |
if self.customer_can_write and is_writing: | |
is_authorized = True | |
if self.customer_path is not None: | |
is_authorized = self._SubdomainAuthorization__can_customer_access_resource(bundle.obj, bundle.request.user.username) | |
elif is_reading: | |
is_authorized = self.customer_can_read | |
else: | |
# Checks if we are an allowed pos user or the pos owner | |
is_authorized = ( | |
bundle.request.user.is_authenticated() and | |
bundle.request.user.profile.can_access_subdomain(bundle.request.subdomain) | |
) | |
if (bundle.obj is not None) and not is_reading: | |
# If we have an object (aka we are not creating it) then check that the subdomain is right | |
is_authorized = ( | |
is_authorized and | |
self._SubdomainAuthorization__can_access_resource(bundle.obj, bundle.request.subdomain) | |
) | |
# The customer is deleting an address | |
if bundle.request.user.is_authenticated() and \ | |
(bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER) and \ | |
bundle.request.user.profile.customer.id == bundle.obj.customer_id and is_deleting: | |
is_authorized = True | |
if not is_authorized: | |
raise Unauthorized("Insufficent permissions.") | |
else: | |
return True | |
def read_list(self, object_list, bundle): | |
if self._SubdomainAuthorization__check_detail_permissions(bundle, is_reading=True): | |
filter = Q() | |
if not(bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER): | |
filter = self._SubdomainAuthorization__build_subdomain_query(bundle.request.subdomain) | |
return object_list.filter(filter) | |
else: | |
raise Unauthorized("Insufficent permissions.") | |
def read_detail(self, object_list, bundle): | |
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_reading=True) | |
def create_list(self, object_list, bundle): | |
# TODO: test the creation of a list and implement this authorization | |
raise Unauthorized("Insufficent permissions.") | |
def create_detail(self, object_list, bundle): | |
if hasattr(bundle.obj, 'pos_id'): | |
if not (bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER): | |
bundle.obj.pos = bundle.request.subdomain.application | |
else: | |
bundle.obj.pos = Pos.objects.get(id=bundle.data['pos_id']) | |
if hasattr(bundle.obj, 'customer_id') and (bundle.request.user.profile.user_type_id == UserType.USER_TYPE_ENDUSER): | |
bundle.obj.customer = bundle.request.user.profile.customer | |
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_writing=True) | |
def update_list(self, object_list, bundle): | |
raise Unauthorized("Insufficent permissions.") | |
def update_detail(self, object_list, bundle): | |
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_writing=True) | |
def delete_list(self, object_list, bundle): | |
raise Unauthorized("Insufficent permissions.") | |
def delete_detail(self, object_list, bundle): | |
return self._SubdomainAuthorization__check_detail_permissions(bundle, is_deleting=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The contents of individual model .js files will be concatenated into dist/models.js | |
"use strict"; | |
(function() { | |
// Protects views where angular is not loaded from errors | |
if ( typeof angular == 'undefined' ) { | |
return; | |
} | |
var module = angular.module('CartModule', ['APIModule', 'AuthModule']); | |
module.service('CartService', function($q, APIService, AuthService) { | |
function Cart() { | |
this.init(); | |
this.load(); | |
} | |
Cart.prototype.init = function () { | |
this.cart = { | |
items: [] | |
,pos: null | |
,total: 0 | |
,delivery: false | |
,selectedAddress: null | |
}; | |
return this; | |
}; | |
/** | |
* Loads the cart from the localStorage | |
* @returns {Cart} | |
*/ | |
Cart.prototype.load = function () { | |
var loadedCart = JSON.parse(window.localStorage.getItem('cart')); | |
if (loadedCart != null) { | |
this.cart = loadedCart; | |
} | |
return this; | |
}; | |
/** | |
* Saves the cart to the localStorage | |
* @returns {Cart} | |
*/ | |
Cart.prototype.save = function () { | |
window.localStorage.setItem('cart', JSON.stringify(this.cart)); | |
return this; | |
}; | |
/** | |
* Clear the cart and saves the cleared cart to the localStorage | |
* @returns {Cart|*|Session} | |
*/ | |
Cart.prototype.clear = function () { | |
return this.init().save(); | |
}; | |
/** | |
* Adds an item to the cart and saves it to the localStorage with the right quantity. Returns the item that | |
* has just been added | |
* @param item | |
* @param price the current selected price for the item | |
* @returns {*|Mixed} | |
*/ | |
Cart.prototype.add = function (item, price) { | |
if ((!_.isObject(item)) || (!_.isObject(price))) { | |
throw('You have to specify an item and a price if you want to add something to the CartService'); | |
} | |
var foundItem = this.searchItem(item, price); | |
item.selectedPrice = price; | |
if (foundItem == null) { | |
item.quantity = 1; | |
this.cart.items.push(item); | |
} else { | |
item = foundItem; | |
item.quantity += 1; | |
} | |
this.calculateTotal(); | |
this.save(); | |
return item; | |
}; | |
/** | |
* Removes an item from the cart and saves it to localStorage | |
* @param index | |
* @returns {Cart} | |
*/ | |
Cart.prototype.remove = function (index) { | |
if ((index >= this.cart.items.length) && (this.cart.items.length > 0)) { | |
throw('You must provide an index to remove between 0 and ' + (this.cart.items.length - 1)); | |
} | |
this.cart.items.splice(index, 1); | |
if (this.cart.items.length === 0) { | |
// We need to reset the pos | |
this.init(); | |
} else { | |
this.calculateTotal(); | |
} | |
this.save(); | |
return this; | |
}; | |
/** | |
* Searches an item from the current cart and returns it | |
* @param searchItem | |
* @param searchPrice optional parameter to check if the selectedPrice parameter of the item is equal to the searchPrice | |
* @returns {*|Mixed} | |
*/ | |
Cart.prototype.searchItem = function (searchItem, searchPrice) { | |
return _.find(this.cart.items, function (item) { | |
var result = parseInt(item.id) === parseInt(searchItem.id); | |
if (result && (searchPrice != null)) { | |
result = result && (parseFloat(item.selectedPrice.value) === parseFloat(searchPrice.value)); | |
} | |
return result; | |
}); | |
}; | |
/** | |
* Calculates the current total of the cart, based on the items that the cart has in it | |
*/ | |
Cart.prototype.calculateTotal = function () { | |
var total = 0; | |
for (var i = 0; i < this.cart.items.length; i++) { | |
var item = this.cart.items[i]; | |
total += parseInt(item.quantity) * parseFloat(item.selectedPrice.value); | |
} | |
if ((this.cart.delivery) && (this.cart.pos.delivery_price != null)) { | |
total += parseFloat(this.cart.pos.delivery_price); | |
} | |
// Round to 2 cyphers I.E. | |
// 2.236 * 100 = 223.6 | |
// Math.round(223.6) = 224 | |
// 224 / 100 = 2.24 | |
this.cart.total = Math.round(total * 100) / 100; | |
return this; | |
}; | |
/** | |
* This method perform the checkout of the cart and creates an order+orderlines on the server. | |
* It returns a promise that will be resolved if everything was fine | |
* | |
* @returns {*|Promise|Number|number|Deferred} | |
*/ | |
Cart.prototype.checkout = function () { | |
var _this = this; | |
var deferred = $q.defer(); | |
var user = AuthService.getUserData(); | |
if (user != null) { | |
var orderData = { | |
'total_price': this.cart.total | |
,'delivery': this.cart.delivery | |
,'delivery_price': (this.cart.delivery ? this.cart.pos.delivery_price : 0) | |
,'pos_id': this.cart.pos.id | |
,'order_lines': [] | |
// TODO: fix the status of the order | |
,'order_status': '/pos/pos/order_status/2/' | |
,'customer': '/pos/pos/customer/' + user.customer_id + '/' | |
,'customer_address': this.cart.selectedAddress | |
,'customer_phone': user.current_phone_number | |
,'notes': '' | |
,'customer_name': user.first_name + ' ' + user.last_name | |
}; | |
APIService.all('order').post(orderData).then(function (response) { | |
var promises = []; | |
for (var i = 0; i < _this.cart.items.length; i++) { | |
var item = _this.cart.items[i]; | |
var orderLineData = { | |
'repetitions': item.quantity | |
,'price': item.selectedPrice.value | |
,'price_category_name': item.selectedPrice.price_category.name | |
,'product': '/pos/pos/product/' + item.id + '/' | |
,'product_name': item.name | |
,'product_category_name': item.product_category.name | |
,'is_custom_product': false | |
,'order': '/pos/pos/order/' + response.id + '/' | |
,'addings': [] | |
,'subtractions': [] | |
,'orderline_position': i | |
}; | |
promises.push(APIService.all('order_line').post(orderLineData)); | |
} | |
$q.all(promises).then(function () { | |
_this.clear(); | |
deferred.resolve(); | |
}, function () { | |
deferred.reject('There was an error during the order\'s checkout (orderline)'); | |
}); | |
}, function () { | |
deferred.reject('There was an error during the order\'s checkout (order)'); | |
}); | |
} else { | |
deferred.reject('The user is not properly logged in'); | |
} | |
return deferred.promise; | |
}; | |
return new Cart(); | |
}); | |
})(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
describe('CartService tests', function () { | |
var $scope | |
,$httpBackend | |
,CartService | |
,AuthService | |
,BASEURL = 'http://pizzanuvola.com:8000' | |
,testCartData = { | |
foo: 'bar' | |
,foo2: 'bar2' | |
,delivery: true | |
,pos: { | |
id: 2 | |
,name: 'test' | |
,address: 'test addr' | |
,delivery_price: 2.22 | |
} | |
,items: [ | |
{ | |
id: 2 | |
,foo3: 'bar3' | |
,quantity: 1 | |
,selectedPrice: { | |
id: 4 | |
,price_category: { | |
name: 'test' | |
} | |
,value: "6.66" | |
} | |
,product_category: { | |
name: 'product cat test' | |
} | |
} | |
,{ | |
id: 5 | |
,foo3: 'bar4' | |
,quantity: 3 | |
,selectedPrice: { | |
id: 3 | |
,price_category: { | |
name: 'test2' | |
} | |
,value: "4.44" | |
} | |
,product_category: { | |
name: 'product cat test2' | |
} | |
} | |
,{ | |
id: 6 | |
,foo3: 'bar5' | |
,quantity: 2 | |
,selectedPrice: { | |
id: 1 | |
,price_category: { | |
name: 'test3' | |
} | |
,value: "12.50" | |
} | |
,product_category: { | |
name: 'product cat test3' | |
} | |
} | |
] | |
} | |
; | |
beforeEach(module('pnApp')); | |
beforeEach(function () { | |
AuthService = { | |
getUserData: function () { | |
return {}; | |
} | |
}; | |
angular.mock.module(function ($provide) { | |
$provide.value('AuthService', AuthService); | |
}); | |
}); | |
beforeEach(inject(function ($rootScope, _$httpBackend_, _CartService_) { | |
$scope = $rootScope.$new(); | |
$httpBackend = _$httpBackend_; | |
CartService = _CartService_; | |
})); | |
afterEach(function () { | |
// Clearing the localstorage after each test | |
window.localStorage.clear(); | |
}); | |
it('Should load the cart properly from the localStorage', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
var cart = CartService.load().cart; | |
expect(cart.foo).toBe('bar'); | |
expect(cart.foo2).toBe('bar2'); | |
expect(cart.items.length).toBe(3); | |
expect(cart.items[0].id).toBe(2); | |
expect(cart.items[1].id).toBe(5); | |
}); | |
it('Should add correctly an item to the cart, saving it to localStorage aswell', function () { | |
var cart = CartService.load().cart; | |
expect(cart.items.length).toBe(0); | |
CartService.add({ | |
id: 7 | |
,foo: 'bar' | |
} | |
,{ | |
id: 1 | |
,value: "6.55" | |
}); | |
// We manually load the localStorage data to check that the modifications has been saved | |
var savedCart = JSON.parse(window.localStorage.getItem('cart')); | |
expect(savedCart.items.length).toBe(1); | |
expect(savedCart.items[0].id).toBe(7); | |
}); | |
it('Should update the quantity of the cart if I add items with the same id, but be aware of the price', function () { | |
var cart = CartService.load().cart; | |
expect(cart.items.length).toBe(0); | |
// We need to use functions in this test because multiple CartService.add calls alters the first parameter structure | |
var getProduct = function () { | |
return { | |
id: 7 | |
,foo: 'bar' | |
}; | |
}; | |
var getPrice1 = function () { | |
return { | |
id: 6 | |
,value: "5.55" | |
}; | |
}; | |
var getPrice2 = function () { | |
return { | |
id: 3 | |
,value: "6" | |
}; | |
}; | |
CartService.add(getProduct(), getPrice1()); | |
expect(CartService.searchItem(getProduct(), getPrice1()).quantity).toBe(1); | |
CartService.add(getProduct(), getPrice1()); | |
expect(CartService.searchItem(getProduct(), getPrice1()).quantity).toBe(2); | |
CartService.add(getProduct(), getPrice2()); | |
expect(CartService.searchItem(getProduct(), getPrice2()).quantity).toBe(1); | |
expect(CartService.searchItem(getProduct(), getPrice1()).quantity).toBe(2); | |
}); | |
it('Should remove an item from the service and the localStorage', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
var cart = CartService.load().cart; | |
var oldLength = cart.items.length; | |
CartService.remove(0); | |
// It should have removed an item | |
expect(cart.items.length).toBe(oldLength - 1); | |
CartService.remove(0); | |
// It should have removed another item | |
expect(cart.items.length).toBe(oldLength - 2); | |
expect(function() { | |
CartService.remove(99); | |
}).toThrow(); | |
}); | |
it('Should clear the cart and the localStorage', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
CartService.load(); | |
expect(CartService.cart.items.length).toBe(3); | |
CartService.clear(); | |
expect(CartService.cart.items.length).toBe(0); | |
// We manually load the localStorage data to check that the modifications has been saved | |
var savedCart = JSON.parse(window.localStorage.getItem('cart')); | |
expect(savedCart.items.length).toBe(0); | |
}); | |
it('Should properly calculate the total based on the items', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
CartService.load(); | |
CartService.calculateTotal(); | |
expect(CartService.cart.total).toBe(47.2); | |
}); | |
it('Should reset the pos if we delete all the products', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
CartService.load(); | |
expect(CartService.cart.pos).not.toBeNull(); | |
var cartLength = CartService.cart.items.length; | |
for (var i = 0; i < cartLength; i++) { | |
CartService.remove(0); | |
} | |
expect(CartService.cart.items.length).toBe(0); | |
expect(CartService.cart.pos).toBeNull(); | |
}); | |
it('Should reject the promise when we try to checkout an order without user data', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
var spyMethodSuccess | |
,spyMethodFailure | |
; | |
CartService.load(); | |
spyOn(AuthService, 'getUserData').andReturn(null); | |
spyMethodSuccess = jasmine.createSpy(); | |
spyMethodFailure = jasmine.createSpy(); | |
CartService.checkout().then(spyMethodSuccess, spyMethodFailure); | |
// This apply releases the promises so they go into the 'then' method(s) | |
$scope.$apply(); | |
expect(spyMethodSuccess).not.toHaveBeenCalled(); | |
expect(spyMethodFailure).toHaveBeenCalledWith('The user is not properly logged in'); | |
}); | |
it('Should reject the promise when we something went bad on the order POST when trying to checkout', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
var spyMethodSuccess | |
,spyMethodFailure | |
; | |
CartService.load(); | |
spyOn(AuthService, 'getUserData').andReturn('something'); | |
spyMethodSuccess = jasmine.createSpy(); | |
spyMethodFailure = jasmine.createSpy(); | |
$httpBackend | |
.when('POST', BASEURL + '/pos/pos/order/') | |
.respond(function(method, url, data) { | |
return [500, {}]; | |
}); | |
CartService.checkout().then(spyMethodSuccess, spyMethodFailure); | |
// This apply releases the promises so they go into the 'then' method(s) | |
$scope.$apply(); | |
$httpBackend.flush(); | |
expect(spyMethodSuccess).not.toHaveBeenCalled(); | |
expect(spyMethodFailure).toHaveBeenCalledWith('There was an error during the order\'s checkout (order)'); | |
}); | |
it('Should reject the promise when we something went bad on the orderLine POST when trying to checkout', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
var spyMethodSuccess | |
,spyMethodFailure | |
; | |
CartService.load(); | |
spyOn(AuthService, 'getUserData').andReturn('something'); | |
spyMethodSuccess = jasmine.createSpy(); | |
spyMethodFailure = jasmine.createSpy(); | |
$httpBackend | |
.when('POST', BASEURL + '/pos/pos/order/') | |
.respond(function(method, url, data) { | |
return [200, {}]; | |
}); | |
$httpBackend | |
.when('POST', BASEURL + '/pos/pos/order_line/') | |
.respond(function(method, url, data) { | |
return [500, {}]; | |
}); | |
CartService.checkout().then(spyMethodSuccess, spyMethodFailure); | |
// This apply releases the promises so they go into the 'then' method(s) | |
$scope.$apply(); | |
$httpBackend.flush(); | |
expect(spyMethodSuccess).not.toHaveBeenCalled(); | |
expect(spyMethodFailure).toHaveBeenCalledWith('There was an error during the order\'s checkout (orderline)'); | |
}); | |
it('Should resolve the promise if the checkout process went fine', function () { | |
window.localStorage.setItem('cart', JSON.stringify(testCartData)); | |
var spyMethodSuccess | |
,spyMethodFailure | |
; | |
CartService.load(); | |
spyOn(AuthService, 'getUserData').andReturn('something'); | |
spyMethodSuccess = jasmine.createSpy(); | |
spyMethodFailure = jasmine.createSpy(); | |
$httpBackend | |
.when('POST', BASEURL + '/pos/pos/order/') | |
.respond(function(method, url, data) { | |
return [200, {}]; | |
}); | |
$httpBackend | |
.when('POST', BASEURL + '/pos/pos/order_line/') | |
.respond(function(method, url, data) { | |
return [200, {}]; | |
}); | |
CartService.checkout().then(spyMethodSuccess, spyMethodFailure); | |
// This apply releases the promises so they go into the 'then' method(s) | |
$scope.$apply(); | |
$httpBackend.flush(); | |
expect(spyMethodSuccess).toHaveBeenCalled(); | |
expect(spyMethodFailure).not.toHaveBeenCalled(); | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from django.contrib.auth.models import User | |
from django.test import TestCase, RequestFactory | |
import sys | |
from tastypie.bundle import Bundle | |
from tastypie.exceptions import ImmediateHttpResponse | |
from apps.authorization.models import * | |
from apps.authorization.middleware import SubdomainMiddleware | |
from apps.pos.models import * | |
from apps.pos.api import * | |
from apps.pos.apifull import * | |
from django.test.client import Client | |
from apps.main.ajax import app_form_submit | |
import json | |
class AuthTests(TestCase): | |
fixtures = [ | |
"sites_fixture.json", | |
"test_fixture.json" | |
] | |
# This dict indicates the permission per pos owner expressed in the order: | |
# RESOURCE: [CAN_READ, CAN_WRITE, CAN_DELETE] | |
CAN_READ_ID = 0 | |
CAN_WRITE_ID = 1 | |
CAN_DELETE_ID = 2 | |
pos_owner_permissions = { | |
'adding' : [True, True, True], | |
'product' : [True, True, True], | |
'order_line' : [True, True, True], | |
'product_category' : [True, True, True], | |
'price' : [True, True, True], | |
'pos' : [True, True, True], | |
'note' : [True, True, True], | |
'setting' : [True, True, True], | |
'price_category' : [True, True, True], | |
'poslist' : [True, False, False], | |
'address' : [True, True, True], | |
'order_status' : [True, False, False], | |
'kinship' : [True, True, True], | |
'phone_number' : [True, True, True], | |
'order' : [True, True, True], | |
'subtraction' : [True, True, True], | |
'business_hour' : [True, True, True], | |
'call' : [True, True, True], | |
'customerfull' : [True, True, True], | |
'orderpartial' : [True, True, True], | |
'product_categoryfull' : [True, True, True], | |
'kinshipfull' : [True, True, True], | |
'productfullingredients': [True, True, True], | |
'orderdetailfull' : [True, True, True], | |
'pricefull' : [True, True, True], | |
'orderfull' : [True, True, True], | |
'productfull' : [True, True, True], | |
} | |
pos_owner2_permissions = { | |
'adding' : [False, False, False], | |
'product' : [False, False, False], | |
'order_line' : [False, False, False], | |
'product_category' : [False, False, False], | |
'price' : [False, False, False], | |
'pos' : [False, False, False], | |
'note' : [False, False, False], | |
'setting' : [False, False, False], | |
'price_category' : [False, False, False], | |
'poslist' : [True, False, False], | |
'address' : [False, False, False], | |
'order_status' : [True, False, False], | |
'kinship' : [False, False, False], | |
'phone_number' : [False, False, False], | |
'order' : [False, False, False], | |
'subtraction' : [False, False, False], | |
'business_hour' : [False, False, False], | |
'call' : [False, False, False], | |
'customerfull' : [False, False, False], | |
'orderpartial' : [False, False, False], | |
'product_categoryfull' : [False, False, False], | |
'kinshipfull' : [False, False, False], | |
'productfullingredients': [False, False, False], | |
'orderdetailfull' : [False, False, False], | |
'pricefull' : [False, False, False], | |
'orderfull' : [False, False, False], | |
'productfull' : [False, False, False], | |
} | |
pos_enduser_permissions = { | |
'adding' : [True, False, False], | |
'product' : [True, False, False], | |
'order_line' : [True, True, False], | |
'product_category' : [True, False, False], | |
'price' : [True, False, False], | |
'pos' : [True, False, False], | |
'note' : [True, False, False], | |
'setting' : [False, False, False], | |
'price_category' : [True, False, False], | |
'poslist' : [True, False, False], | |
'address' : [True, True, False], | |
'order_status' : [True, False, False], | |
'kinship' : [True, False, False], | |
'phone_number' : [True, False, False], | |
'order' : [True, True, False], | |
'subtraction' : [True, False, False], | |
'business_hour' : [True, False, False], | |
'call' : [False, False, False], | |
'customerfull' : [True, False, False], | |
'orderpartial' : [True, False, False], | |
'product_categoryfull' : [True, False, False], | |
'kinshipfull' : [True, False, False], | |
'productfullingredients': [True, False, False], | |
'orderdetailfull' : [True, False, False], | |
'pricefull' : [True, False, False], | |
'orderfull' : [True, False, False], | |
'productfull' : [True, False, False], | |
} | |
def setUp(self): | |
self.factory = RequestFactory() | |
self.client = Client() | |
self.user_posowner = User.objects.get(username='testposowner') | |
self.user_posowner2 = User.objects.get(username='testposowner2') | |
self.user_enduser = User.objects.get(username='testenduser') | |
# self.user_posuser = User.objects.get(username='testposuser') | |
self.posowner1_pos_name = "provaposowner" | |
self.posowner2_pos_name = "provaposowner2" | |
self.posowner1_pos = Pos.objects.get( | |
subdomain__name=self.posowner1_pos_name, | |
name=self.posowner1_pos_name | |
) | |
self.posowner2_pos = Pos.objects.get( | |
subdomain__name=self.posowner2_pos_name, | |
name=self.posowner2_pos_name | |
) | |
def process_request(self, request): | |
""" | |
Makes the request parameter to be processed by the SubdomainMiddleware | |
""" | |
return SubdomainMiddleware().process_request(request) | |
def check_permission(self, user, pos, operation_id, resource_name, resource): | |
""" | |
This method performs different operations due to his parameters and it can | |
* simulate a READ from the tastypie API | |
* simulate a WRITE from the tastypie API | |
* simulate a DELETE from the tastypie API | |
in order to do this, it creates a fake request and a fake bundle with that request | |
it then calls the native Tastypie methods that calls the relative authorization and returns True or False if the operation succeed | |
""" | |
if operation_id == self.CAN_READ_ID: | |
tmp_request = self.build_get_request_api(user, pos.name, resource_name) | |
self.process_request(tmp_request) | |
# Getting all the objects for this resource | |
tmp_bundle = Bundle(request=tmp_request) | |
try: | |
object_list = resource.obj_get_list(tmp_bundle) | |
if len(object_list) > 0: | |
object_item = object_list[0] | |
# Building a fake get request | |
tmp_request = self.build_get_request_api(user, pos.name, resource_name, object_item) | |
self.process_request(tmp_request) | |
# Getting a single objects to check the other method in the authorization | |
tmp_bundle = Bundle(request=tmp_request, obj=object_item) | |
object_item = resource.obj_get(tmp_bundle, pk=object_item.id) | |
except ImmediateHttpResponse, e: | |
# If the status code of the exception is 401 (Unauthorized) then return False, else bubble up the exception | |
if e.response.status_code == 401: | |
return False | |
else: | |
raise e | |
return True | |
elif operation_id == self.CAN_DELETE_ID: | |
tmp_request = self.build_delete_request_api(user, pos.name, resource_name) | |
self.process_request(tmp_request) | |
# Getting all the objects for this resource | |
tmp_bundle = Bundle(request=tmp_request) | |
try: | |
object_list = resource.obj_get_list(tmp_bundle) | |
if len(object_list) > 0: | |
object_item = object_list[0] | |
# Building a fake delete request | |
tmp_request = self.build_delete_request_api(user, pos.name, resource_name, object_item) | |
self.process_request(tmp_request) | |
# Getting a single objects to check the other method in the authorization | |
tmp_bundle = Bundle(request=tmp_request, obj=object_item) | |
object_item = resource.obj_delete(tmp_bundle, pk=object_item.id) | |
except ImmediateHttpResponse, e: | |
# If the status code of the exception is 401 (Unauthorized) then return False, else bubble up the exception | |
if e.response.status_code == 401: | |
return False | |
else: | |
raise e | |
return True | |
elif operation_id == self.CAN_WRITE_ID: | |
tmp_request = self.build_create_request_api(user, pos.name, resource_name) | |
self.process_request(tmp_request) | |
# Getting all the objects for this resource | |
tmp_bundle = Bundle(request=tmp_request) | |
try: | |
if hasattr(resource.Meta.object_class, 'SUBDOMAIN_NAME_PATH'): | |
# I filter the default queryset with the right objects | |
fields_to_query = {} | |
fields_to_query['%s__iexact' % resource.Meta.object_class.SUBDOMAIN_NAME_PATH] = self.user_posowner.profile.pos_applications.select_related('pos_applications')[0].subdomain.name | |
resource_queryset = resource.Meta.queryset.filter(Q(**fields_to_query)) | |
else: | |
resource_queryset = resource.Meta.queryset | |
# If the queryset is empty, we cannot test the creation (because we base the verification duplicating an object) | |
if len(resource_queryset) is 0: | |
print "Skipping the creation test for the resource %s (queryset is empty)" % resource_name | |
return None | |
object_item = resource_queryset[0] | |
if user.profile.user_type_id == UserType.USER_TYPE_ENDUSER: | |
# If the user trying to test the creation is an enduser, we go inside the customer path and we force the customer | |
# of the resource to be the user's customer, so the authorization can do a proper check | |
if resource.Meta.object_class.CUSTOMER_PATH is None: | |
return False | |
else: | |
current_cursor = object_item | |
customer_path_splitted = resource.Meta.object_class.CUSTOMER_PATH.split('__') | |
customer_attribute_name = customer_path_splitted.pop() | |
for path_split in customer_path_splitted: | |
current_cursor = getattr(current_cursor, path_split) | |
setattr(current_cursor, customer_attribute_name, user.profile.customer) | |
# Building a fake create request | |
tmp_request = self.build_create_request_api(user, pos.name, resource_name) | |
self.process_request(tmp_request) | |
# Getting a single objects to check the other method in the authorization | |
object_item.pk = None | |
tmp_bundle = Bundle(request=tmp_request, obj=object_item, data={'pos_id': pos.id}) | |
resource.authorized_create_detail(resource.get_object_list(tmp_bundle.request), tmp_bundle) | |
except ImmediateHttpResponse, e: | |
# If the status code of the exception is 401 (Unauthorized) then return False, else bubble up the exception | |
if e.response.status_code == 401: | |
return False | |
else: | |
raise e | |
return True | |
def build_get_request_api(self, user, subdomain, resource_name, resource=None): | |
url = "/pos/pos/%s" % resource_name | |
if resource is not None: | |
url += '/%s/' % resource.id | |
request = self.factory.get( | |
url, | |
HTTP_HOST="%s.pizzanuvola.com" % subdomain | |
) | |
request.user = user | |
return request | |
def build_delete_request_api(self, user, subdomain, resource_name, resource=None): | |
url = "/pos/pos/%s" % resource_name | |
if resource is not None: | |
url += '/%s/' % resource.id | |
request = self.factory.delete( | |
url, | |
HTTP_HOST="%s.pizzanuvola.com" % subdomain | |
) | |
request.user = user | |
return request | |
def build_create_request_api(self, user, subdomain, resource_name, resource=None): | |
url = "/pos/pos/%s" % resource_name | |
if resource is not None: | |
url += '/%s/' % resource.id | |
request = self.factory.post( | |
url, | |
HTTP_HOST="%s.pizzanuvola.com" % subdomain | |
) | |
request.user = user | |
return request | |
def test_basic_login_as_posowner(self): | |
""" | |
Tests the basic creation of a pos owner user and login | |
""" | |
response = self.client.get( | |
'/dashboard/', | |
HTTP_HOST='pizzanuvola.com' | |
) | |
self.assertEqual(response.status_code, 302) | |
self.assertEqual('http://pizzanuvola.com/accounts/login/?next=/dashboard/', response._headers['location'][1]) | |
self.client.login(username='testposowner', password='test') | |
response = self.client.get( | |
'/', | |
HTTP_HOST='pizzanuvola.com' | |
) | |
self.assertEqual(response.status_code, 302) | |
self.assertEqual('http://pizzanuvola.com/dashboard', response._headers['location'][1]) | |
def test_basic_login_as_enduser(self): | |
""" | |
Tests the basic creation of a end user and login | |
""" | |
self.client.login(username='testenduser', password='test') | |
response = self.client.get( | |
'/dashboard/', | |
HTTP_HOST='pizzanuvola.com' | |
) | |
self.assertEqual(response.status_code, 403) | |
def test_pos_creation_permissions(self): | |
""" | |
Tests if the permission check about the creation of a pos is working | |
""" | |
def send_pos_create_ajax_request(pos_name, user): | |
pos_data = { | |
"subdomain_name": pos_name, | |
"name": pos_name | |
} | |
serialized_data = [] | |
for key, value in pos_data.iteritems(): | |
serialized_data.append("%s=%s" % (key,value)) | |
serialized_data = '&'.join(serialized_data) | |
request = self.factory.post( | |
'/', | |
data=pos_data, | |
HTTP_HOST='pizzanuvola.com' | |
) | |
request.user = user | |
request.session = {} | |
json_response = app_form_submit(request, serialized_data) | |
return json.loads(json_response) | |
# We send a fake ajax request to the ajax view that handles the creation of the pos | |
# macking it belive that we are a pos owner | |
response = send_pos_create_ajax_request("testposowner", self.user_posowner) | |
self.assertEqual(response.get('success', False), True) | |
created_pos_query = Pos.objects.filter(name='testposowner') | |
self.assertEqual(len(created_pos_query), 1) | |
# We check that we should not be able to create two pos with the same name | |
response = send_pos_create_ajax_request("testposowner", self.user_posowner) | |
self.assertEqual(response.get('success', False), False) | |
created_pos_query = Pos.objects.filter(name='testposowner') | |
self.assertEqual(len(created_pos_query), 1) | |
# if we are an end user we should not be able to create it | |
response = send_pos_create_ajax_request("testposenduser", self.user_enduser) | |
self.assertEqual(response.get('success', False), False) | |
created_pos_query = Pos.objects.filter(name='testposenduser') | |
self.assertEqual(len(created_pos_query), 0) | |
def test_tastypie_subdomainauthentication(self): | |
""" | |
Tests the tastypie subdomainauthentication class to be working properly | |
TODO: setup permissions for the posuser | |
""" | |
def check_tastypie_resource(resource_name, resource): | |
if not isinstance(resource._meta.authorization, SubdomainAuthorization): | |
print "Skipping the check for the resource %s (authorization class: %s)" % (resource_name, resource._meta.authorization) | |
return | |
print "Checking the permissions for the '%s' resource..." % resource_name | |
tmp_request = self.build_get_request_api(self.user_posowner, self.posowner1_pos_name, resource_name) | |
self.process_request(tmp_request) | |
# Getting all the objects for this resource | |
tmp_bundle = Bundle(request=tmp_request) | |
object_list = resource.obj_get_list(tmp_bundle) | |
item_check_counter = 0 | |
for item in object_list: | |
current_cursor = item | |
if item.POS_PATH is not None and item.POS_PATH != '': | |
pos_path_splitted = item.POS_PATH.split('__') | |
for path_split in pos_path_splitted: | |
current_cursor = getattr(current_cursor, path_split) | |
self.assertEqual(current_cursor, self.posowner1_pos) | |
item_check_counter += 1 | |
print "OK (%s item checked)\n" % item_check_counter | |
# We check that the posowner can't see the 'things' | |
for resource_name, resource in pos_api._registry.iteritems(): | |
check_tastypie_resource(resource_name, resource) | |
for resource_name, resource in pos_apifull._registry.iteritems(): | |
check_tastypie_resource(resource_name, resource) | |
def test_pos_owner_read_permissions(self): | |
""" | |
This test do a detailed check for the read capabilities of the owner | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_owner_permissions.keys(): | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
# TESTING READ CAPABILITIES ----------------------------------- | |
check_result = self.check_permission(self.user_posowner, self.posowner1_pos, self.CAN_READ_ID, resource_name, resource) | |
self.assertEqual( | |
check_result, | |
self.pos_owner_permissions[resource_name][self.CAN_READ_ID], | |
msg="The user '%s' was %s to READ the '%s' resource of the pos '%s'" % (self.user_posowner, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
check_result = self.check_permission(self.user_posowner2, self.posowner1_pos, self.CAN_READ_ID, resource_name, resource) | |
self.assertEqual( | |
check_result, | |
self.pos_owner2_permissions[resource_name][self.CAN_READ_ID], | |
msg="The user '%s' was %s to READ the '%s' resource of the pos '%s'" % (self.user_posowner2, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
def test_enduser_read_permissions(self): | |
""" | |
This test do a detailed check for the read capabilities of the enduser | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_owner_permissions.keys(): | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
check_result = self.check_permission(self.user_enduser, self.posowner1_pos, self.CAN_READ_ID, resource_name, resource) | |
self.assertEqual( | |
check_result, | |
self.pos_enduser_permissions[resource_name][self.CAN_READ_ID], | |
msg="The user '%s' was %s to READ the '%s' resource of the pos '%s'" % (self.user_enduser, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
def test_posowner_delete_permissions(self): | |
""" | |
This test do a detailed check for the delete capabilities of the posowner | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_owner_permissions.keys(): | |
sid = transaction.savepoint() | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
check_result = self.check_permission(self.user_posowner, self.posowner1_pos, self.CAN_DELETE_ID, resource_name, resource) | |
self.assertEqual( | |
check_result, | |
self.pos_owner_permissions[resource_name][self.CAN_DELETE_ID], | |
msg="The user '%s' was %s to DELETE the '%s' resource of the pos '%s'" % (self.user_posowner, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
transaction.savepoint_rollback(sid) | |
def test_posowner2_delete_permissions(self): | |
""" | |
This test do a detailed check for the delete capabilities of the posowner2 | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_owner_permissions.keys(): | |
sid = transaction.savepoint() | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
check_result = self.check_permission(self.user_posowner2, self.posowner1_pos, self.CAN_DELETE_ID, resource_name, resource) | |
self.assertEqual( | |
check_result, | |
self.pos_owner2_permissions[resource_name][self.CAN_DELETE_ID], | |
msg="The user '%s' was %s to DELETE the '%s' resource of the pos '%s'" % (self.user_posowner2, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
transaction.savepoint_rollback(sid) | |
def test_enduser_delete_permissions(self): | |
""" | |
This test do a detailed check for the delete capabilities of the enduser | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_owner_permissions.keys(): | |
sid = transaction.savepoint() | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
check_result = self.check_permission(self.user_enduser, self.posowner1_pos, self.CAN_DELETE_ID, resource_name, resource) | |
self.assertEqual( | |
check_result, | |
self.pos_enduser_permissions[resource_name][self.CAN_DELETE_ID], | |
msg="The user '%s' was %s to DELETE the '%s' resource of the pos '%s'" % (self.user_enduser, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
transaction.savepoint_rollback(sid) | |
def test_posowner_create_permissions(self): | |
""" | |
This test do a detailed check for the write capabilities of the posowner | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_owner_permissions.keys(): | |
sid = transaction.savepoint() | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
check_result = self.check_permission(self.user_posowner, self.posowner1_pos, self.CAN_WRITE_ID, resource_name, resource) | |
if check_result is not None: | |
self.assertEqual( | |
check_result, | |
self.pos_owner_permissions[resource_name][self.CAN_WRITE_ID], | |
msg="The user '%s' was %s to CREATE the '%s' resource of the pos '%s'" % (self.user_posowner, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
transaction.savepoint_rollback(sid) | |
def test_posowner2_create_permissions(self): | |
""" | |
This test do a detailed check for the write capabilities of the posowner2 | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_owner2_permissions.keys(): | |
sid = transaction.savepoint() | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
check_result = self.check_permission(self.user_posowner2, self.posowner1_pos, self.CAN_WRITE_ID, resource_name, resource) | |
if check_result is not None: | |
self.assertEqual( | |
check_result, | |
self.pos_owner2_permissions[resource_name][self.CAN_WRITE_ID], | |
msg="The user '%s' was %s to CREATE the '%s' resource of the pos '%s'" % (self.user_posowner2, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
transaction.savepoint_rollback(sid) | |
def test_enduser_create_permissions(self): | |
""" | |
This test do a detailed check for the write capabilities of the enduser | |
""" | |
for resource_name, resource in pos_api._registry.iteritems(): | |
if resource_name in self.pos_enduser_permissions.keys(): | |
sid = transaction.savepoint() | |
def get_able_or_unable(check_result): | |
if check_result: | |
return 'able' | |
else: | |
return 'unable' | |
check_result = self.check_permission(self.user_enduser, self.posowner1_pos, self.CAN_WRITE_ID, resource_name, resource) | |
if check_result is not None: | |
self.assertEqual( | |
check_result, | |
self.pos_enduser_permissions[resource_name][self.CAN_WRITE_ID], | |
msg="The user '%s' was %s to CREATE the '%s' resource of the pos '%s'" % (self.user_enduser, get_able_or_unable(check_result), resource_name, self.posowner1_pos.name) | |
) | |
transaction.savepoint_rollback(sid) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment