Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Tastypie queryset on child resource not working
"""
API resources
"""
# Django imports
from django import forms
from django.db.models import Count
from django.utils.datastructures import SortedDict
from django.contrib.contenttypes.models import ContentType
# 3rd party imports
from tastypie import fields
from tastypie.authentication import ApiKeyAuthentication
from tastypie.constants import ALL, ALL_WITH_RELATIONS
from tastypie.validation import FormValidation
# Project imports
from ..authorization.contracts import ContractsAuthorization
# Core app imports
from vbenergyzone.core.models.contracts import Contract
from vbenergyzone.core.models.documents import Document
from vbenergyzone.core.models.permissions import get_permissions
from ..paginator import BackbonePaginator
from . import ResourceBase
from .customers import CustomersResource
from .partners import PartnersResource
from .suppliers import SuppliersResource
# ==============================================================================
class ContractValidationForm(forms.ModelForm): #pylint: disable=R0924
"""
Define a form for validation.
"""
class Meta: # pylint: disable=C1001, W0232, C0111, R0903
fields = (
'name',
)
model = Contract
class ContractsResource(ResourceBase):
customer = fields.ToOneField(
attribute='customer',
blank=True,
full=True,
null=True,
to=CustomersResource,
)
number_of_documents = fields.IntegerField(attribute='number_of_documents', readonly=True)
partner = fields.ToOneField(
attribute='partner',
blank=True,
full=True,
null=True,
to=PartnersResource,
)
product_type_display = fields.CharField(readonly=True)
supplier = fields.ToOneField(
attribute='supplier',
blank=True,
full=True,
null=True,
to=SuppliersResource,
)
workflow_state_display = fields.CharField(readonly=True)
class Meta:
always_return_data = True
authentication = ApiKeyAuthentication()
authorization = ContractsAuthorization()
collection_name = 'objects'
detail_allowed_methods = ['delete', 'get', 'put']
fields = [
'id',
'end_date',
'estimated_annual_load_factor',
'estimated_annual_volume',
'name',
'number_of_documents',
'price',
'product_type',
'start_date',
'workflow_state',
]
filtering = {
'customer' : ALL,
'end_date' : ALL,
'name' : ALL,
'partner' : ALL,
'price' : ALL,
'start_date' : ALL,
'supplier' : ALL,
'product_type' : ALL,
'workflow_state': ALL,
}
include_absolute_url = True
limit = 10
list_allowed_methods = ['get', 'post']
max_limit = 100
ordering = [
'customer',
'end_date',
'estimated_annual_load_factor'
'estimated_annual_volume',
'name',
'number_of_documents',
'partner',
'price',
'product_type',
'start_date',
'supplier',
'workflow_state',
]
paginator_class = BackbonePaginator
content_type = ContentType.objects.get(app_label='core', model='contract')
number_of_documents_select = (
"select count(*) from core_document, eee_core_attachment "
"where parent_type_id = %s and parent_id = core_contract.id "
"and core_document.attachment_ptr_id = eee_core_attachment.id"
% content_type.id
)
queryset = Contract.objects.extra(
select={'number_of_documents':number_of_documents_select},
).order_by('name')
resource_name = 'contracts'
validation = FormValidation(form_class=ContractValidationForm)
def alter_list_data_to_serialize(self, request, data):
subset = 'contracts'
permissions = get_permissions(request.user, [subset])
deletable_obj_ids = (
[o.id for o in permissions[subset]['deletable_objs']]
)
updatable_obj_ids = (
[o.id for o in permissions[subset]['updatable_objs']]
)
viewable_obj_ids = (
[o.id for o in permissions[subset]['viewable_objs']]
)
for bundle in data['objects']:
obj = bundle.data
obj['deletable'] = obj['id'] in deletable_obj_ids
obj['updatable'] = obj['id'] in updatable_obj_ids
obj['viewable'] = obj['id'] in viewable_obj_ids
permissions[subset].pop('deletable_objs', None)
permissions[subset].pop('updatable_objs', None)
permissions[subset].pop('viewable_objs', None)
data['meta']['permissions'] = permissions
return data
def apply_sorting(self, obj_list, options=None):
options = options.copy()
if 'sort_by' in options:
if 'order' in options:
if options['order'] == 'desc':
options['sort_by'] = '-' + options['sort_by']
if options['sort_by'] == 'customer':
obj_list = obj_list.order_by('customer__name')
elif options['sort_by'] == '-customer':
obj_list = obj_list.order_by('-customer__name')
elif options['sort_by'] == 'partner':
obj_list = obj_list.order_by('partner__name')
elif options['sort_by'] == '-partner':
obj_list = obj_list.order_by('-partner__name')
elif options['sort_by'] == 'supplier':
obj_list = obj_list.order_by('supplier__name')
elif options['sort_by'] == '-supplier':
obj_list = obj_list.order_by('-supplier__name')
else:
obj_list = obj_list.order_by(options['sort_by'])
return obj_list
def build_filters(self, filters=None):
if filters is None:
filters = {}
orm_filters = super(ContractsResource, self).build_filters(filters)
if "q" in filters:
search_string = filters['q']
search_terms = search_string.split()
choices = Contract._meta.get_field('workflow_state').choices
workflow_state_values = [choice[0] for choice in choices]
workflow_state_displays = [choice[1] for choice in choices]
choices = Contract._meta.get_field('product_type').choices
product_type_values = [choice[0] for choice in choices]
product_type_displays = [choice[1] for choice in choices]
sqs = Contract.objects.filter(pk=-1)
for search_term in search_terms:
sqs |= Contract.objects.filter(name__icontains=search_term)
sqs |= Contract.objects.filter(customer__name__icontains=search_term)
sqs |= Contract.objects.filter(partner__name__icontains=search_term)
sqs |= Contract.objects.filter(supplier__name__icontains=search_term)
for workflow_state_display in workflow_state_displays:
if search_term.upper() in workflow_state_display.upper():
index = workflow_state_displays.index(workflow_state_display)
workflow_state_value = workflow_state_values[index]
sqs |= Contract.objects.filter(workflow_state=workflow_state_value)
for product_type_display in product_type_displays:
if search_term.upper() in product_type_display.upper():
index = product_type_displays.index(product_type_display)
product_type_value = product_type_values[index]
sqs |= Contract.objects.filter(product_type=product_type_value)
orm_filters["pk__in"] = [i.pk for i in sqs]
return orm_filters
def dehydrate_customer(self, bundle):
retval = None
contract = bundle.obj
customer = contract.customer
if customer:
retval = {
"id": customer.id,
"name": customer.name,
"resource_uri": bundle.data['customer'].data['resource_uri'],
}
return retval
def dehydrate_estimated_annual_load_factor(self, bundle):
contract = bundle.obj
return contract.get_estimated_annual_load_factor()
def dehydrate_estimated_annual_volume(self, bundle):
contract = bundle.obj
return contract.get_estimated_annual_volume()
def dehydrate_number_of_documents(self, bundle):
contract = bundle.obj
return contract.number_of_documents
def dehydrate_partner(self, bundle):
retval = None
contract = bundle.obj
partner = contract.partner
if partner:
retval = {
"id": partner.id,
"name": partner.name,
"resource_uri": bundle.data['partner'].data['resource_uri'],
}
return retval
def dehydrate_product_type_display(self, bundle):
contract = bundle.obj
return contract.get_product_type_display()
def dehydrate_supplier(self, bundle):
retval = None
contract = bundle.obj
supplier = contract.supplier
if supplier:
retval = {
"id": supplier.id,
"name": supplier.name,
"resource_uri": bundle.data['supplier'].data['resource_uri'],
}
return retval
def dehydrate_workflow_state_display(self, bundle):
contract = bundle.obj
return contract.get_workflow_state_display()
"""
API resources
"""
# Django imports
from django import forms
from django.db.models import Count
# 3rd party imports
from tastypie import fields
from tastypie.authentication import ApiKeyAuthentication
from tastypie.constants import ALL, ALL_WITH_RELATIONS
from tastypie.exceptions import Unauthorized
from tastypie.validation import FormValidation
# Project imports
from ..authorization.customers import CustomersAuthorization
# Core app imports
from vbenergyzone.core.models.customers import Customer
from vbenergyzone.core.models.permissions import get_permissions
from ..paginator import BackbonePaginator
from . import ResourceBase
# ==============================================================================
class CustomerValidationForm(forms.ModelForm): #pylint: disable=R0924
"""
Define a meter form for validation.
"""
class Meta: # pylint: disable=C1001, W0232, C0111, R0903
fields = (
'customer_type',
'name',
)
model = Customer
class CustomersResource(ResourceBase):
active_suppliers = fields.ListField(readonly=True)
last_expiring_contract = fields.DictField(readonly=True)
last_usage_end_date = fields.DateTimeField(
attribute='_last_usage_end_date',
blank=True,
null=True,
readonly=True,
)
number_of_meters = fields.IntegerField(
attribute='number_of_meters',
blank=True,
null=True,
readonly=True,
)
# TODO tags =
class Meta:
always_return_data = True
authentication = ApiKeyAuthentication()
authorization = CustomersAuthorization()
collection_name = 'objects'
detail_allowed_methods = ['delete', 'get', 'put']
fields = [
'active',
'active_suppliers',
'customer_type',
'duns_number',
'estimated_annual_volume',
'id',
'last_expiring_contract',
'last_usage_end_date',
'legal_name',
'name',
'notes',
'number_of_meters',
'taxpayer_id_number',
]
filtering = {
"name": ALL,
}
include_absolute_url = True
limit = 10
list_allowed_methods = ['get', 'post']
max_limit = 100
ordering = [
'estimated_annual_volume',
'last_usage_end_date',
'name',
'number_of_meters'
]
paginator_class = BackbonePaginator
number_of_meters_select = (
"select count(*) from core_meter "
"where core_meter.customer_id = core_customer.organization_ptr_id"
)
queryset = Customer.objects.extra(
select={'number_of_meters':number_of_meters_select},
).order_by('name')
resource_name = 'customers'
validation = FormValidation(form_class=CustomerValidationForm)
def alter_list_data_to_serialize(self, request, data):
user = request.user
user_type = user.user_profile.user_type()
subset = 'customers'
permissions = get_permissions(request.user, [subset])
deletable_obj_ids = (
[o.id for o in permissions[subset]['deletable_objs']]
)
updatable_obj_ids = (
[o.id for o in permissions[subset]['updatable_objs']]
)
viewable_obj_ids = (
[o.id for o in permissions[subset]['viewable_objs']]
)
for bundle in data['objects']:
obj = bundle.data
obj['deletable'] = obj['id'] in deletable_obj_ids
obj['updatable'] = obj['id'] in updatable_obj_ids
obj['viewable'] = obj['id'] in viewable_obj_ids
if user_type == "supplier":
obj.pop('active_suppliers', None)
obj.pop('last_expiring_contract', None)
permissions[subset].pop('deletable_objs', None)
permissions[subset].pop('updatable_objs', None)
permissions[subset].pop('viewable_objs', None)
data['meta']['permissions'] = permissions
return data
def build_filters(self, filters=None):
if filters is None:
filters = {}
orm_filters = super(CustomersResource, self).build_filters(filters)
if "q" in filters:
search_string = filters['q']
search_terms = search_string.split()
sqs = Customer.objects.filter(pk=-1)
for search_term in search_terms:
sqs |= Customer.objects.filter(name__icontains=search_term)
orm_filters["pk__in"] = [i.pk for i in sqs]
return orm_filters
def dehydrate_active_suppliers(self, bundle):
customer = bundle.obj
active_suppliers = (
[{"id": supplier.id, "name": supplier.name}
for supplier in customer.active_suppliers()]
)
return active_suppliers
def dehydrate_last_expiring_contract(self, bundle):
customer = bundle.obj
contract = customer.last_expiring_contract()
retval = None
if contract:
retval = {
"end_date" : contract.end_date,
"id" : contract.id,
"name" : contract.name,
}
return retval
def dehydrate_number_of_meters(self, bundle):
customer = bundle.obj
return customer.number_of_meters
# try:
# return customer.number_of_meters
# except AttributeError:
# return None
@ErikEvenson

This comment has been minimized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.