Skip to content

Instantly share code, notes, and snippets.

@ricardosasilva
Last active August 29, 2015 14:09
Show Gist options
  • Save ricardosasilva/6823461110f3765c89e9 to your computer and use it in GitHub Desktop.
Save ricardosasilva/6823461110f3765c89e9 to your computer and use it in GitHub Desktop.
Custom Haystack Elasticsearch backend with function_score and percolator support
from haystack.backends.elasticsearch_backend import ElasticsearchSearchQuery, ElasticsearchSearchBackend, \
ElasticsearchSearchEngine
from haystack.query import SearchQuerySet
from haystack.constants import DEFAULT_ALIAS, DJANGO_CT
from django.conf import settings
from haystack.utils import get_model_ct
# Snagged this a LOT of this from: https://github.com/josephdrose/django-haystack
# and http://www.stamkracht.com/extending-haystacks-elasticsearch-backend/
class ConfigurableElasticBackend(ElasticsearchSearchBackend):
DEFAULT_ANALYZER = "snowball"
def __init__(self, connection_alias, **connection_options):
super(ConfigurableElasticBackend, self).__init__(
connection_alias, **connection_options)
user_settings = getattr(settings, 'ELASTICSEARCH_INDEX_SETTINGS')
if user_settings:
setattr(self, 'DEFAULT_SETTINGS', user_settings)
def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
fields='', highlight=False, facets=None,
date_facets=None, query_facets=None,
narrow_queries=None, spelling_query=None,
within=None, dwithin=None, distance_point=None,
models=None, limit_to_registered_models=None,
result_class=None,custom_score=None,nested=None):
out = super(ConfigurableElasticBackend, self).build_search_kwargs(query_string, sort_by, start_offset, end_offset,
fields, highlight, facets,
date_facets, query_facets,
None, spelling_query, #narrow_queries==None
within, dwithin, distance_point,
None, False, #models==None, limit_..._models==False
result_class)
# Wrapping order TOP: inner query -> DOWN: outer queries
if custom_score:
out['query'] = { "function_score": {
"script_score": {"script": custom_score['score_query_string']},
"query": out['query']
}
}
if custom_score['score_query_params']:
out['query']['custom_score']['params'] = custom_score['score_query_params']
if nested:
# check if there is an original query
if 'match_all' not in out['query']:
out['query'] = self.bool_query_factory(out['query'], nested)
else:
out['query'] = self.nested_query_factory(nested)
## START outer wrapping of filter(s): narrow(access/models/etc.)
if limit_to_registered_models is None:
limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if models and len(models):
model_choices = sorted(['%s.%s' % (model._meta.app_label, model._meta.module_name) for model in models])
elif limit_to_registered_models:
# Using narrow queries, limit the results to only models handled
# with the current routers.
model_choices = self.build_models_list()
else:
model_choices = []
if len(model_choices) > 0:
if narrow_queries is None:
narrow_queries = set()
narrow_queries.add('%s:(%s)' % (DJANGO_CT, ' OR '.join(model_choices)))
if narrow_queries:
out['query'] = {
'filtered': {
'query': out['query'],
'filter': {
'fquery': {
'query': {
'query_string': {
'query': u' AND '.join(list(narrow_queries)),
},
},
'_cache': True,
}
}
}
}
## END outer wrapping of model filter(s)
return out
def nested_query_factory(self, nested):
score_script = "(doc['%s.points'].empty ? 0 : doc['%s.points'].value)" % \
(nested['nested_query_path'],nested['nested_query_path'])
query = {"nested": {
"path": nested['nested_query_path'],
"score_mode": "total",
"query": {
"function_score": {
"query": {
"terms": {
nested['nested_query_field']: nested['nested_query_terms'],
"minimum_match" : 1
}
},
"script_score": {
"script" : score_script,
"lang": "mvel"
},
"boost_mode": "replace"
}
}
}
}
return query
def bool_query_factory(self, original_query, nested):
query = {"bool": {
"should": [
self.nested_query_factory(nested),
original_query
],
"minimum_should_match": 1
}
}
return query
def build_schema(self, fields):
content_field_name = ''
mapping = {}
for field_name, field_class in fields.items():
field_mapping = {
'boost': field_class.boost,
'index': 'analyzed',
'store': 'yes',
'type': 'string',
}
if field_class.document is True:
content_field_name = field_class.index_fieldname
if field_class.field_type in ['date', 'datetime']:
field_mapping['type'] = 'date'
elif field_class.field_type == 'integer':
field_mapping['type'] = 'long'
elif field_class.field_type == 'float':
field_mapping['type'] = 'float'
elif field_class.field_type == 'boolean':
field_mapping['type'] = 'boolean'
elif field_class.field_type == 'nested':
field_mapping['type'] = 'nested'
try:
field_mapping['properties'] = field_class.properties
except AttributeError:
pass
elif field_class.field_type == 'ngram':
field_mapping['analyzer'] = "ngram_analyzer"
elif field_class.field_type == 'edge_ngram':
field_mapping['analyzer'] = "edgengram_analyzer"
elif field_class.field_type == 'location':
field_mapping['type'] = 'geo_point'
if field_class.stored is False:
field_mapping['store'] = 'no'
# Do this last to override `text` fields.
if field_class.indexed is False or hasattr(field_class, 'facet_for'):
field_mapping['index'] = 'not_analyzed'
if field_mapping['type'] == 'string' and field_class.indexed:
field_mapping["term_vector"] = "with_positions_offsets"
if not hasattr(field_class, 'facet_for') and not field_class.field_type in('ngram', 'edge_ngram'):
field_mapping['analyzer'] = getattr(field_class, 'analyzer',
self.DEFAULT_ANALYZER)
mapping[field_class.index_fieldname] = field_mapping
return (content_field_name, mapping)
def build_schema(self, fields):
content_field_name, mapping = super(ConfigurableElasticBackend,
self).build_schema(fields)
for field_name, field_class in fields.items():
field_mapping = mapping[field_class.index_fieldname]
if field_mapping['type'] == 'string' and field_class.indexed:
if not hasattr(field_class, 'facet_for') and not \
field_class.field_type in('ngram', 'edge_ngram'):
field_mapping['analyzer'] = getattr(field_class, 'analyzer',
self.DEFAULT_ANALYZER)
mapping.update({field_class.index_fieldname: field_mapping})
return (content_field_name, mapping)
def put_percolator(self, percolator_id, query):
'''
query must be a dictionary like {'query': {'match': {'title': 'text to create percolator'}}}
'''
self.conn.index(self.index_name, '.percolator', query, id=percolator_id)
def get_percolator(self, percolator_id):
return self.conn.get(index=self.index_name, doc_type='.percolator', id=percolator_id)
def delete_percolator(self, percolator_id):
'''
Deletes a saved search under the specified id
'''
self.conn.delete(self.index_name, '.percolator', id=percolator_id)
def percolate(self, model_instance):
model_class = model_instance._meta.concrete_model
model_path = get_model_ct(model_class)
return self.conn.percolate(self.index_name, doc_type='modelresult', id='{model}.{instance_id}'.format(
model=model_path, instance_id=model_instance.pk), body='{"track_scores": true}')['matches']
class ConfigurableSearchQuerySet(SearchQuerySet):
def custom_score(self, score_query_string=None, params=None):
"""Adds arguments for custom_score to the query"""
clone = self._clone()
clone.query.add_custom_score(score_query_string, params)
return clone
def nested(self, terms=None, path="tags", field="tag"):
"""Adds arguments for nested to the query"""
clone = self._clone()
clone.query.add_nested(terms, path, field)
return clone
class ConfigurableElasticsearchSearchQuery(ElasticsearchSearchQuery):
def __init__(self, using=DEFAULT_ALIAS):
out = super(ConfigurableElasticsearchSearchQuery, self).__init__(using)
self.custom_score = {}
self.nested = {}
def add_custom_score(self, score_query_string=None, params=None):
"""Adds arguments for custom_score to the query"""
self.custom_score = {
'score_query_string': score_query_string,
'score_query_params': params,
}
def add_nested(self, terms=None, path=None, field=None):
"""Adds arguments for nested to the query"""
self.nested = {
'nested_query_terms': terms,
'nested_query_path': path,
'nested_query_field': field
}
def build_params(self, spelling_query=None, **kwargs):
"""
Add custom_score and/or nested parameters
"""
search_kwargs = super(ConfigurableElasticsearchSearchQuery, self).build_params(spelling_query, **kwargs)
if self.custom_score:
search_kwargs['custom_score'] = self.custom_score
if self.nested:
search_kwargs['nested'] = self.nested
return search_kwargs
def _clone(self, klass=None, using=None):
clone = super(ConfigurableElasticsearchSearchQuery, self)._clone(klass, using)
clone.custom_score = self.custom_score
clone.nested = self.nested
return clone
class ConfigurableElasticSearchEngine(ElasticsearchSearchEngine):
backend = ConfigurableElasticBackend
query = ConfigurableElasticsearchSearchQuery
@aolieman
Copy link

Hi Ricardo,

Great to see that my blog post was helpful for you to make your own Elasticsearch backend!

The "custom_score" query was still from the Elasticsearch 0.90.x era, and is now deprecated. I could have mentioned it in the blog post, because I already used "function_score" for the nested queries ;-).

It's also cool that you were able to integrate this with Joseph Drose's percolator support. Thanks for sharing!

Cheers,
Alex

P.S. the blog post is also available as a gist. Feel free to fork and add to it!

@aolieman
Copy link

You may also want to consider using the Groovy plugin for function_score scripts, instead of MVEL. Since Elasticsearch 1.3 the Groovy scripts can be sandboxed, and thereby eliminate the main security issue with dynamic scripts. This is not possible with any of the other supported scripting languages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment