Haystack provides an interface similar to Django's QuerySet
, which instead enables easy querying in one or more popular search backends. Because the Haystack SearchQuerySet
API is meant to hook up to several search backends, however, not all the functionality of the backends has been implemented in the API. In this article we show how Haystack's Elasticsearch backend can be extended with advanced querying functionality.
As an exemplary use case, we'll focus on implementing Elasticsearch's Nested Query in the SearchQuerySet
API, to enable e.g. weighted tags on documents. The usage of this extended API will be shown first, after which we'll go through the necessary implementation steps.
import search.custom_elasticsearch as ces
from files import FileObject
# Instantiate a ConfigurableSearchQuerySet
sqs = ces.ConfigurableSearchQuerySet()
# A nested query for one tag
sqs.nested(["milk"])
#[<SearchResult: files.fileobject (pk=u'6')>, <SearchResult: files.fileobject (pk=u'10')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: wiki.wikiarticle (pk=u'5')>]
# Which we can combine with an ordinary keyword query
# Both queries, in this case, are wrapped in a bool "should" query
sqs.nested(["milk"]).filter(text="yoghurt")
#[<SearchResult: wiki.wikiarticle (pk=u'5')>, <SearchResult: files.fileobject (pk=u'6')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: files.fileobject (pk=u'10')>, <SearchResult: files.fileobject (pk=u'3')>, <SearchResult: files.fileobject (pk=u'5')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'13')>]
# We can use a custom score for a query...
sqs.filter(text="isolation").custom_score("_score * doc['hotness']")
#[<SearchResult: files.fileobject (pk=u'8')>, <SearchResult: wiki.wikiarticle (pk=u'1')>, <SearchResult: wiki.wikiarticle (pk=u'3')>, <SearchResult: files.fileobject (pk=u'16')>, <SearchResult: files.fileobject (pk=u'3')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: files.fileobject (pk=u'4')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'13')>]
# ...and still combine it with a nested (tag) query
sqs.filter(text='fermented').custom_score('_score').nested(['heat'])
#[<SearchResult: files.fileobject (pk=u'3')>, <SearchResult: wiki.wikiarticle (pk=u'15')>, <SearchResult: files.fileobject (pk=u'8')>, <SearchResult: files.fileobject (pk=u'16')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'6')>, <SearchResult: files.fileobject (pk=u'4')>, <SearchResult: files.fileobject (pk=u'13')>, <SearchResult: files.fileobject (pk=u'5')>]
# A filtered query can be added as outermost wrapping by using .models() and/or .narrow()
sqs.nested(["heat"]).custom_score('_score').filter(text='dairy').models(FileObject)
#[<SearchResult: files.fileobject (pk=u'3')>, <SearchResult: files.fileobject (pk=u'16')>, <SearchResult: files.fileobject (pk=u'8')>, <SearchResult: files.fileobject (pk=u'4')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: files.fileobject (pk=u'10')>, <SearchResult: files.fileobject (pk=u'13')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'6')>]
The basic approach for extending the API is to define some custom classes, which inherit from SearchQuerySet
, SearchQuery
, and SearchBackend
and which override several existing methods and add a few new ones. Credits are due to Nathan Keilar, who was kind enough to share his initial solution to this problem.
It would be advisable to place the custom classes we will create in a new file, near your other search-related application code. In this article we will assume the location my_app.search.custom_elasticsearch
.
The first thing to ensure is that our existing indexes can correctly update with nested fields. A TagsField
needs to be defined, which will be picked up by ConfigurableElasticBackend.build_schema()
so Elasticsearch can update its mapping and index nested fields properly. Unfortunately, we need to override build_schema()
inelegantly to add a few lines at the right position. Finally, ConfigurableElasticsearchSearchQuery
and ConfigurableElasticSearchEngine
need to be defined, in order to change Haystack's backend in the configuration.
from haystack.fields import SearchField
from haystack.backends.elasticsearch_backend import ElasticsearchSearchBackend, ElasticsearchSearchQuery
from haystack.backends.elasticsearch_backend import ElasticsearchSearchEngine
class TagsField(SearchField):
field_type = "nested"
properties = {
"tag": {
"type": "string",
"index": "not_analyzed",
"omit_norms": True,
"index_options": "docs"
},
"points": {
"type": "float"
}
}
class ConfigurableElasticBackend(ElasticsearchSearchBackend):
def build_schema(self, fields):
content_field_name = ''
mapping = {}
for field_name, field_class in fields.items():
field_mapping = {
'boost': field_class.boost,
'index': 'analyzed',
'store': 'yes',
'type': 'string',
}
if field_class.document is True:
content_field_name = field_class.index_fieldname
if field_class.field_type in ['date', 'datetime']:
field_mapping['type'] = 'date'
elif field_class.field_type == 'integer':
field_mapping['type'] = 'long'
elif field_class.field_type == 'float':
field_mapping['type'] = 'float'
elif field_class.field_type == 'boolean':
field_mapping['type'] = 'boolean'
elif field_class.field_type == 'nested':
field_mapping['type'] = 'nested'
try:
field_mapping['properties'] = field_class.properties
except AttributeError:
pass
elif field_class.field_type == 'ngram':
field_mapping['analyzer'] = "ngram_analyzer"
elif field_class.field_type == 'edge_ngram':
field_mapping['analyzer'] = "edgengram_analyzer"
elif field_class.field_type == 'location':
field_mapping['type'] = 'geo_point'
if field_class.stored is False:
field_mapping['store'] = 'no'
# Do this last to override `text` fields.
if field_class.indexed is False or hasattr(field_class, 'facet_for'):
field_mapping['index'] = 'not_analyzed'
if field_mapping['type'] == 'string' and field_class.indexed:
field_mapping["term_vector"] = "with_positions_offsets"
if not hasattr(field_class, 'facet_for') and not field_class.field_type in('ngram', 'edge_ngram'):
field_mapping["analyzer"] = "snowball"
mapping[field_class.index_fieldname] = field_mapping
return (content_field_name, mapping)
class ConfigurableElasticsearchSearchQuery(ElasticsearchSearchQuery):
pass
class ConfigurableElasticSearchEngine(ElasticsearchSearchEngine):
backend = ConfigurableElasticBackend
query = ConfigurableElasticsearchSearchQuery
Once the custom classes are in place, it can be used by pointing HAYSTACK_CONNECTIONS
to it:
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'search.custom_elasticsearch.ConfigurableElasticSearchEngine',
'URL': 'http://127.0.0.1:9200',
'INDEX_NAME': 'haystack',
}
}
Because we have only extended the indexing functionality at this point, all querying should work as normally (and will still use the default SearchQuerySet
). But by using TagsField
in your indexes and adjusting your prepare methods to output lists of dictionaries, these fields will now fully function as nested fields within Elasticsearch.
Let's illustrate this with a short example:
class TaggableIndex(indexes.SearchIndex, indexes.Indexable):
tags = search.custom_elasticsearch.TagsField(indexed=False, stored=True, boost=1.5)
def prepare_tags(self, obj):
return [{'tag': tag.name, 'points': tag.point_total} for tag in obj.get_tags()]
You might now say: Elasticsearch is "you know, for search", not just "for indexing", and you would be entirely correct. Extending the Elasticsearch backend will require us to subclass SearchQuerySet
and to add some more methods to the subclasses we already defined.
from haystack.query import SearchQuerySet
from haystack.constants import DEFAULT_ALIAS, DJANGO_CT
class ConfigurableSearchQuerySet(SearchQuerySet):
def custom_score(self, score_query_string=None, params=None):
"""Adds arguments for custom_score to the query"""
clone = self._clone()
clone.query.add_custom_score(score_query_string, params)
return clone
def nested(self, terms=None, path="tags", field="tag"):
"""Adds arguments for nested to the query"""
clone = self._clone()
clone.query.add_nested(terms, path, field)
return clone
class ConfigurableElasticsearchSearchQuery(ElasticsearchSearchQuery):
def __init__(self, using=DEFAULT_ALIAS):
out = super(ConfigurableElasticsearchSearchQuery, self).__init__(using)
self.custom_score = {}
self.nested = {}
def add_custom_score(self, score_query_string=None, params=None):
"""Adds arguments for custom_score to the query"""
self.custom_score = {
'score_query_string': score_query_string,
'score_query_params': params,
}
def add_nested(self, terms=None, path=None, field=None):
"""Adds arguments for nested to the query"""
self.nested = {
'nested_query_terms': terms,
'nested_query_path': path,
'nested_query_field': field
}
def build_params(self, spelling_query=None, **kwargs):
"""
Add custom_score and/or nested parameters
"""
search_kwargs = super(ConfigurableElasticsearchSearchQuery, self).build_params(spelling_query, **kwargs)
if self.custom_score:
search_kwargs['custom_score'] = self.custom_score
if self.nested:
search_kwargs['nested'] = self.nested
return search_kwargs
def _clone(self, klass=None, using=None):
clone = super(ConfigurableElasticsearchSearchQuery, self)._clone(klass, using)
clone.custom_score = self.custom_score
clone.nested = self.nested
return clone
Basically, these extensions allow for the additional methods custom_score(score_query_string=None, params=None)
and nested(terms=None, path="tags", field="tag")
to be used from a ConfigurableSearchQuerySet
instance. Their arguments are, via ConfigurableElasticsearchSearchQuery
passed to ConfigurableElasticBackend
which can finally build a dictionary of the query. The code that adds custom_score()
is Nathan Keilar's and nested()
follows its example.
class ConfigurableElasticBackend(ElasticsearchSearchBackend):
def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
fields='', highlight=False, facets=None,
date_facets=None, query_facets=None,
narrow_queries=None, spelling_query=None,
within=None, dwithin=None, distance_point=None,
models=None, limit_to_registered_models=None,
result_class=None,custom_score=None,nested=None):
out = super(ConfigurableElasticBackend, self).build_search_kwargs(query_string, sort_by, start_offset, end_offset,
fields, highlight, facets,
date_facets, query_facets,
narrow_queries, spelling_query,
within, dwithin, distance_point,
models, limit_to_registered_models,
result_class)
if nested:
out['query'] = self.nested_query_factory(nested)
elif custom_score:
out['query'] = { "custom_score": {
"script": custom_score['score_query_string'],
"query": out['query']
}
}
if custom_score['score_query_params']:
out['query']['custom_score']['params'] = custom_score['score_query_params']
return out
def nested_query_factory(self, nested):
score_script = "(doc['%s.points'].empty ? 0 : doc['%s.points'].value)" % \
(nested['nested_query_path'],nested['nested_query_path'])
query = {"nested": {
"path": nested['nested_query_path'],
"score_mode": "total",
"query": {
"function_score": {
"query": {
"terms": {
nested['nested_query_field']: nested['nested_query_terms'],
"minimum_match" : 1
}
},
"script_score": {
"script" : score_script,
"lang": "mvel"
},
"boost_mode": "replace"
}
}
}
}
return query
def build_schema(self, fields):
# as defined above
....
return (content_field_name, mapping)
As it is defined now, a sqs.nested()
query replaces any original queries that may be called on the search query set. The nested query is set up by nested_query_factory()
in such a way that additional queries are not easily inserted in this structure. A sqs.custom_score()
query, in contrast, wraps around any other queries on sqs
, incorporating them into the final query that is requested from Elasticsearch.
The query that is built by nested_query_factory()
does include a function score query (the successor of custom_score), to offer fine-grained control over how a score for the parent document is computed from its nested children. We have hardcoded a score script to demonstrate the weighted tag use case, but implementing it as an attribute to ConfigurableSearchQuerySet.nested()
is a small step to take.
Being able to do a nested query that replaces any original query is not sufficient. We want to combine queries conveniently, in a way that makes sense for our application.
This creates the need to decide on an order in which the queries are wrapped together. There's no need to customize all the ordering though; we're mostly interested in the wrapping of the outer levels of the query. To do this, let's override several keyword arguments on super(ConfigurableElasticBackend, self)
, to move them to the outer wrapping and to customize their order.
Once more, the final ConfigurableElasticBackend
(don't forget the additional import!):
from django.conf import settings
class ConfigurableElasticBackend(ElasticsearchSearchBackend):
def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
fields='', highlight=False, facets=None,
date_facets=None, query_facets=None,
narrow_queries=None, spelling_query=None,
within=None, dwithin=None, distance_point=None,
models=None, limit_to_registered_models=None,
result_class=None,custom_score=None,nested=None):
out = super(ConfigurableElasticBackend, self).build_search_kwargs(query_string, sort_by, start_offset, end_offset,
fields, highlight, facets,
date_facets, query_facets,
None, spelling_query, #narrow_queries==None
within, dwithin, distance_point,
None, False, #models==None, limit_..._models==False
result_class)
# Wrapping order TOP: inner query -> DOWN: outer queries
if custom_score:
out['query'] = { "custom_score": {
"script": custom_score['score_query_string'],
"query": out['query']
}
}
if custom_score['score_query_params']:
out['query']['custom_score']['params'] = custom_score['score_query_params']
if nested:
# check if there is an original query
if 'match_all' not in out['query']:
out['query'] = self.bool_query_factory(out['query'], nested)
else:
out['query'] = self.nested_query_factory(nested)
## START outer wrapping of filter(s): narrow(access/models/etc.)
if limit_to_registered_models is None:
limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if models and len(models):
model_choices = sorted(['%s.%s' % (model._meta.app_label, model._meta.module_name) for model in models])
elif limit_to_registered_models:
# Using narrow queries, limit the results to only models handled
# with the current routers.
model_choices = self.build_models_list()
else:
model_choices = []
if len(model_choices) > 0:
if narrow_queries is None:
narrow_queries = set()
narrow_queries.add('%s:(%s)' % (DJANGO_CT, ' OR '.join(model_choices)))
if narrow_queries:
out['query'] = {
'filtered': {
'query': out['query'],
'filter': {
'fquery': {
'query': {
'query_string': {
'query': u' AND '.join(list(narrow_queries)),
},
},
'_cache': True,
}
}
}
}
## END outer wrapping of model filter(s)
return out
def nested_query_factory(self, nested):
score_script = "(doc['%s.points'].empty ? 0 : doc['%s.points'].value)" % \
(nested['nested_query_path'],nested['nested_query_path'])
query = {"nested": {
"path": nested['nested_query_path'],
"score_mode": "total",
"query": {
"function_score": {
"query": {
"terms": {
nested['nested_query_field']: nested['nested_query_terms'],
"minimum_match" : 1
}
},
"script_score": {
"script" : score_script,
"lang": "mvel"
},
"boost_mode": "replace"
}
}
}
}
return query
def bool_query_factory(self, original_query, nested):
query = {"bool": {
"should": [
self.nested_query_factory(nested),
original_query
],
"minimum_should_match": 1
}
}
return query
def build_schema(self, fields):
content_field_name = ''
mapping = {}
for field_name, field_class in fields.items():
field_mapping = {
'boost': field_class.boost,
'index': 'analyzed',
'store': 'yes',
'type': 'string',
}
if field_class.document is True:
content_field_name = field_class.index_fieldname
if field_class.field_type in ['date', 'datetime']:
field_mapping['type'] = 'date'
elif field_class.field_type == 'integer':
field_mapping['type'] = 'long'
elif field_class.field_type == 'float':
field_mapping['type'] = 'float'
elif field_class.field_type == 'boolean':
field_mapping['type'] = 'boolean'
elif field_class.field_type == 'nested':
field_mapping['type'] = 'nested'
try:
field_mapping['properties'] = field_class.properties
except AttributeError:
pass
elif field_class.field_type == 'ngram':
field_mapping['analyzer'] = "ngram_analyzer"
elif field_class.field_type == 'edge_ngram':
field_mapping['analyzer'] = "edgengram_analyzer"
elif field_class.field_type == 'location':
field_mapping['type'] = 'geo_point'
if field_class.stored is False:
field_mapping['store'] = 'no'
# Do this last to override `text` fields.
if field_class.indexed is False or hasattr(field_class, 'facet_for'):
field_mapping['index'] = 'not_analyzed'
if field_mapping['type'] == 'string' and field_class.indexed:
field_mapping["term_vector"] = "with_positions_offsets"
if not hasattr(field_class, 'facet_for') and not field_class.field_type in('ngram', 'edge_ngram'):
field_mapping["analyzer"] = "snowball"
mapping[field_class.index_fieldname] = field_mapping
return (content_field_name, mapping)
Nice and helpful notes. Thank you. Why didn't you turn it into a reusable package or made an effort to made appropriate commits to the elasticstack?