Last active
November 20, 2021 16:04
-
-
Save rj76/cf148c8540c25221366b944705c5f1b0 to your computer and use it in GitHub Desktop.
Some django manager methods for the Order model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OrderManager(Manager): | |
def add_available_count(self, qs): | |
available_count = qs.annotate( | |
available_count=Count('user_order_availability', filter=Q(user_order_availability__is_accepted=False)) | |
).filter(user_order_availability__order_id=OuterRef('id')) | |
qs = qs.annotate( | |
available_count=Subquery(available_count.values('available_count'), output_field=IntegerField()), | |
) | |
return qs | |
def add_assigned_count(self, qs): | |
assigned_count = qs.annotate(assigned_count=Count('assigned_orders')).filter( | |
assigned_orders__order_id=OuterRef('id')) | |
qs = qs.annotate( | |
assigned_count=Subquery(assigned_count.values('assigned_count'), output_field=IntegerField()), | |
) | |
return qs | |
def get_order_type_counts(self, order_types, year): | |
def get_order_type_count_annotation(order_type_key, order_type_text): | |
return { | |
order_type_key: Sum( | |
Case( | |
When(order_type_lower=order_type_text, then=Value(1)), | |
default=Value(0), | |
output_field=IntegerField(), | |
), | |
) | |
} | |
def get_aggregate_annotation(key, order_type_key): | |
return { | |
key: Sum(order_type_key) | |
} | |
qs = super().get_queryset().filter(start_date__year=year) | |
qs = qs.annotate(order_type_lower=Lower('order_type')) | |
qs = qs.order_by().values('order_type_lower') | |
order_types_norm = [] | |
for order_type in order_types: | |
order_type_norm = order_type.replace(' ', '_').lower() | |
order_types_norm.append(order_type_norm) | |
qs = qs.annotate(**get_order_type_count_annotation(order_type_norm, order_type.lower())) | |
qs = ( | |
qs.annotate(Total=Count("order_type")) | |
.order_by("-Total", *order_types_norm) | |
.values('Total', *order_types_norm) | |
) | |
aggregate_annotations = { | |
'sum_total': Sum('Total') | |
} | |
for order_type_norm in order_types_norm: | |
aggregate_annotations.update(get_aggregate_annotation('sum_{0}'.format(order_type_norm), order_type_norm)) | |
# aggregate and add percentages | |
qs = qs.aggregate(**aggregate_annotations) | |
qs.update({'{0}_perc'.format(key): round((qs[key] / qs['sum_total'])*100, 2) for key in qs.keys() if key != 'sum_total'}) | |
return qs | |
def get_count_per_month(self, year): | |
qs = super().get_queryset().filter(start_date__year=year) | |
return qs.annotate(month=TruncMonth('start_date')).values('month').annotate(c=Count('id')).values('month', 'c') | |
def get_within_range(self, from_point: Point, radius: int): | |
return self.get_base_qs().filter(geo_location__distance_lt=(from_point, MeasureDistance(km=radius))) | |
def get_by_distance(self, point: Point): | |
qs = self.get_base_qs().annotate(distance=Distance('geo_location', point)) | |
qs = qs.order_by('distance') | |
return qs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment