-
-
Save Svenito/000eee4a4446eb9a7e8a47c8abfab07c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from rest_framework import filters | |
import json | |
from django.db.models import Q, F | |
import operator | |
from functools import reduce, partial | |
OP_MAP = { | |
"eq": "exact", | |
"gt": "gt", | |
"lt": "lt", | |
"gte": "gte", | |
"lte": "lte", | |
"in": "in", | |
"neq": "exact", | |
"startswith": "startswith", | |
"istartswith": "istartswith", | |
"endswith": "endswith", | |
"iendswith": "iendswith", | |
"contains": "contains", | |
"icontains": "icontains" | |
} | |
def create_q(filt): | |
""" | |
Parse a single filter into a Django Q expression. The OP_MAP | |
maps the operand from the query to the Django equivalent | |
""" | |
keyword = "{}__{}".format(filt["name"], OP_MAP[filt["op"]]) | |
q_filter_parms = {} | |
filter_value = filt["val"] | |
if isinstance(filter_value, str) and filter_value.startswith("self."): | |
filter_value = F(filt["val"].split(".")[-1]) | |
q_filter_parms[keyword] = filter_value | |
if filt["op"] == "neq": | |
return ~Q(**q_filter_parms) | |
else: | |
return Q(**q_filter_parms) | |
def parse_filter(user_filter, negate=False): | |
""" | |
Parse out a filter dictionary into a Q object | |
""" | |
if ( | |
"or" not in user_filter | |
and "and" not in user_filter | |
and "not" not in user_filter | |
): | |
q_term = create_q(user_filter) | |
if negate: | |
return ~q_term | |
else: | |
return q_term | |
# process all the filters after the join operands by mapping | |
# them onto the pares_filter. Then use reduce to apply the | |
# operand to all the filters to join them | |
if "and" in user_filter: | |
mapped = map(parse_filter, user_filter.get("and")) | |
out_q = reduce(operator.and_, mapped) | |
return out_q | |
if "or" in user_filter: | |
mapped = map(parse_filter, user_filter.get("or")) | |
out_q = reduce(operator.or_, mapped) | |
return out_q | |
if "not" in user_filter: | |
# Partial to pass negate parm into map | |
invert = partial(parse_filter, negate=True) | |
mapped = map(invert, user_filter.get("not")) | |
out_q = reduce(operator.__invert__, mapped) | |
return out_q | |
class DictionaryFilter(filters.BaseFilterBackend): | |
""" | |
Get the filter expression and process the filters to return | |
a filtered queryset. If `filter` keyword missing return | |
unfiltered queryset | |
""" | |
def filter_queryset(self, request, queryset, view): | |
results = queryset | |
try: | |
filters = json.loads(request.query_params["filter"]) | |
filt = parse_filter(filters) | |
results = queryset.filter(filt) | |
except KeyError: | |
pass | |
try: | |
sort = request.query_params["sort"] | |
results = results.order_by(sort) | |
except KeyError: | |
pass | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment