Skip to content

Instantly share code, notes, and snippets.

@rj76
Last active November 20, 2021 16:04
Show Gist options
  • Save rj76/cf148c8540c25221366b944705c5f1b0 to your computer and use it in GitHub Desktop.
Save rj76/cf148c8540c25221366b944705c5f1b0 to your computer and use it in GitHub Desktop.
Some django manager methods for the Order model
class OrderManager(Manager):
def add_available_count(self, qs):
available_count = qs.annotate(
available_count=Count('user_order_availability', filter=Q(user_order_availability__is_accepted=False))
).filter(user_order_availability__order_id=OuterRef('id'))
qs = qs.annotate(
available_count=Subquery(available_count.values('available_count'), output_field=IntegerField()),
)
return qs
def add_assigned_count(self, qs):
assigned_count = qs.annotate(assigned_count=Count('assigned_orders')).filter(
assigned_orders__order_id=OuterRef('id'))
qs = qs.annotate(
assigned_count=Subquery(assigned_count.values('assigned_count'), output_field=IntegerField()),
)
return qs
def get_order_type_counts(self, order_types, year):
def get_order_type_count_annotation(order_type_key, order_type_text):
return {
order_type_key: Sum(
Case(
When(order_type_lower=order_type_text, then=Value(1)),
default=Value(0),
output_field=IntegerField(),
),
)
}
def get_aggregate_annotation(key, order_type_key):
return {
key: Sum(order_type_key)
}
qs = super().get_queryset().filter(start_date__year=year)
qs = qs.annotate(order_type_lower=Lower('order_type'))
qs = qs.order_by().values('order_type_lower')
order_types_norm = []
for order_type in order_types:
order_type_norm = order_type.replace(' ', '_').lower()
order_types_norm.append(order_type_norm)
qs = qs.annotate(**get_order_type_count_annotation(order_type_norm, order_type.lower()))
qs = (
qs.annotate(Total=Count("order_type"))
.order_by("-Total", *order_types_norm)
.values('Total', *order_types_norm)
)
aggregate_annotations = {
'sum_total': Sum('Total')
}
for order_type_norm in order_types_norm:
aggregate_annotations.update(get_aggregate_annotation('sum_{0}'.format(order_type_norm), order_type_norm))
# aggregate and add percentages
qs = qs.aggregate(**aggregate_annotations)
qs.update({'{0}_perc'.format(key): round((qs[key] / qs['sum_total'])*100, 2) for key in qs.keys() if key != 'sum_total'})
return qs
def get_count_per_month(self, year):
qs = super().get_queryset().filter(start_date__year=year)
return qs.annotate(month=TruncMonth('start_date')).values('month').annotate(c=Count('id')).values('month', 'c')
def get_within_range(self, from_point: Point, radius: int):
return self.get_base_qs().filter(geo_location__distance_lt=(from_point, MeasureDistance(km=radius)))
def get_by_distance(self, point: Point):
qs = self.get_base_qs().annotate(distance=Distance('geo_location', point))
qs = qs.order_by('distance')
return qs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment