Skip to content

Instantly share code, notes, and snippets.

@glenbot
Created December 15, 2011 03:31
Show Gist options
  • Save glenbot/1479709 to your computer and use it in GitHub Desktop.
Save glenbot/1479709 to your computer and use it in GitHub Desktop.
Solr Queue Backend for haystack
import logging
import sys
import time
from threading import Thread
from Queue import Queue
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.db.models.loading import get_model
from haystack.backends import BaseSearchBackend, BaseSearchQuery, log_query, EmptyResults
from haystack.constants import ID, DJANGO_CT, DJANGO_ID
from haystack.exceptions import MissingDependency, MoreLikeThisError
from haystack.models import SearchResult
from haystack.utils import get_identifier
try:
from django.db.models.sql.query import get_proxied_model
except ImportError:
# Likely on Django 1.0
get_proxied_model = None
try:
from pysolr import Solr, SolrError
except ImportError:
raise MissingDependency("The 'solr' backend requires the installation of 'pysolr'. Please refer to the documentation.")
BACKEND_NAME = 'solr_queue'
MAX_THREADS=1
timeout = getattr(settings, 'HAYSTACK_SOLR_TIMEOUT', 10)
conn = Solr(settings.HAYSTACK_SOLR_URL, timeout=timeout)
q = Queue()
#: start threads
for i in range(0, MAX_THREADS):
t = Thread(target=update_worker, args=(q,))
t.daemon = True
t.start()
#: define worker that will get items from synchronous
# queue and signal for the next item in the queue to go
def update_worker(q):
while True:
doc_item = q.get()
try:
conn.add(doc_item['docs'], commit=True, boost=doc_item['boost'])
q.task_done()
except (IOError, SolrError), e:
self.log.error("Failed to add documents to Solr: %s", e)
class SearchBackend(BaseSearchBackend):
# Word reserved by Solr for special use.
RESERVED_WORDS = (
'AND',
'NOT',
'OR',
'TO',
)
# Characters reserved by Solr for special use.
# The '\\' must come first, so as not to overwrite the other slash replacements.
RESERVED_CHARACTERS = (
'\\', '+', '-', '&&', '||', '!', '(', ')', '{', '}',
'[', ']', '^', '"', '~', '*', '?', ':',
)
def __init__(self, site=None):
super(SearchBackend, self).__init__(site)
if not hasattr(settings, 'HAYSTACK_SOLR_URL'):
raise ImproperlyConfigured('You must specify a HAYSTACK_SOLR_URL in your settings.')
self.conn = conn
self.log = logging.getLogger('haystack')
#: assign the q and start blocking
self.q = q
self.q.join()
def update(self, index, iterable, commit=True):
docs = []
try:
for obj in iterable:
docs.append(index.full_prepare(obj))
except UnicodeDecodeError:
sys.stderr.write("Chunk failed.\n")
if len(docs) > 0:
doc_item = {
'docs': docs,
'commit': commit,
'boost': index.get_field_weights()
}
#: put items in the queue to get processed
#: by the worker
self.q.put(doc_item, False)
def remove(self, obj_or_string, commit=True):
solr_id = get_identifier(obj_or_string)
try:
kwargs = {
'commit': commit,
ID: solr_id
}
self.conn.delete(**kwargs)
except (IOError, SolrError), e:
self.log.error("Failed to remove document '%s' from Solr: %s", solr_id, e)
def clear(self, models=[], commit=True):
try:
if not models:
# *:* matches all docs in Solr
self.conn.delete(q='*:*', commit=commit)
else:
models_to_delete = []
for model in models:
models_to_delete.append("%s:%s.%s" % (DJANGO_CT, model._meta.app_label, model._meta.module_name))
self.conn.delete(q=" OR ".join(models_to_delete), commit=commit)
# Run an optimize post-clear. http://wiki.apache.org/solr/FAQ#head-9aafb5d8dff5308e8ea4fcf4b71f19f029c4bb99
self.conn.optimize()
except (IOError, SolrError), e:
if len(models):
self.log.error("Failed to clear Solr index of models '%s': %s", ','.join(models_to_delete), e)
else:
self.log.error("Failed to clear Solr index: %s", e)
@log_query
def search(self, query_string, sort_by=None, start_offset=0, end_offset=None,
fields='', highlight=False, facets=None, date_facets=None, query_facets=None,
narrow_queries=None, spelling_query=None,
limit_to_registered_models=None, result_class=None, **kwargs):
if len(query_string) == 0:
return {
'results': [],
'hits': 0,
}
kwargs = {
'fl': '* score',
}
if fields:
kwargs['fl'] = fields
if sort_by is not None:
kwargs['sort'] = sort_by
if start_offset is not None:
kwargs['start'] = start_offset
if end_offset is not None:
kwargs['rows'] = end_offset - start_offset
if highlight is True:
kwargs['hl'] = 'true'
kwargs['hl.fragsize'] = '200'
if getattr(settings, 'HAYSTACK_INCLUDE_SPELLING', False) is True:
kwargs['spellcheck'] = 'true'
kwargs['spellcheck.collate'] = 'true'
kwargs['spellcheck.count'] = 1
if spelling_query:
kwargs['spellcheck.q'] = spelling_query
if facets is not None:
kwargs['facet'] = 'on'
kwargs['facet.field'] = facets
if date_facets is not None:
kwargs['facet'] = 'on'
kwargs['facet.date'] = date_facets.keys()
kwargs['facet.date.other'] = 'none'
for key, value in date_facets.items():
kwargs["f.%s.facet.date.start" % key] = self.conn._from_python(value.get('start_date'))
kwargs["f.%s.facet.date.end" % key] = self.conn._from_python(value.get('end_date'))
gap_by_string = value.get('gap_by').upper()
gap_string = "%d%s" % (value.get('gap_amount'), gap_by_string)
if value.get('gap_amount') != 1:
gap_string += "S"
kwargs["f.%s.facet.date.gap" % key] = '+%s/%s' % (gap_string, gap_by_string)
if query_facets is not None:
kwargs['facet'] = 'on'
kwargs['facet.query'] = ["%s:%s" % (field, value) for field, value in query_facets]
if limit_to_registered_models is None:
limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if limit_to_registered_models:
# Using narrow queries, limit the results to only models registered
# with the current site.
if narrow_queries is None:
narrow_queries = set()
registered_models = self.build_registered_models_list()
if len(registered_models) > 0:
narrow_queries.add('%s:(%s)' % (DJANGO_CT, ' OR '.join(registered_models)))
if narrow_queries is not None:
kwargs['fq'] = list(narrow_queries)
try:
raw_results = self.conn.search(query_string, **kwargs)
except (IOError, SolrError), e:
self.log.error("Failed to query Solr using '%s': %s", query_string, e)
raw_results = EmptyResults()
return self._process_results(raw_results, highlight=highlight, result_class=result_class)
def more_like_this(self, model_instance, additional_query_string=None,
start_offset=0, end_offset=None,
limit_to_registered_models=None, result_class=None, **kwargs):
# Handle deferred models.
if get_proxied_model and hasattr(model_instance, '_deferred') and model_instance._deferred:
model_klass = get_proxied_model(model_instance._meta)
else:
model_klass = type(model_instance)
index = self.site.get_index(model_klass)
field_name = index.get_content_field()
params = {
'fl': '*,score',
}
if start_offset is not None:
params['start'] = start_offset
if end_offset is not None:
params['rows'] = end_offset
narrow_queries = set()
if limit_to_registered_models is None:
limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if limit_to_registered_models:
# Using narrow queries, limit the results to only models registered
# with the current site.
if narrow_queries is None:
narrow_queries = set()
registered_models = self.build_registered_models_list()
if len(registered_models) > 0:
narrow_queries.add('%s:(%s)' % (DJANGO_CT, ' OR '.join(registered_models)))
if additional_query_string:
narrow_queries.add(additional_query_string)
if narrow_queries:
params['fq'] = list(narrow_queries)
query = "%s:%s" % (ID, get_identifier(model_instance))
try:
raw_results = self.conn.more_like_this(query, field_name, **params)
except (IOError, SolrError), e:
self.log.error("Failed to fetch More Like This from Solr for document '%s': %s", query, e)
raw_results = EmptyResults()
return self._process_results(raw_results, result_class=result_class)
def _process_results(self, raw_results, highlight=False, result_class=None):
if not self.site:
from haystack import site
else:
site = self.site
results = []
hits = raw_results.hits
facets = {}
spelling_suggestion = None
if result_class is None:
result_class = SearchResult
if hasattr(raw_results, 'facets'):
facets = {
'fields': raw_results.facets.get('facet_fields', {}),
'dates': raw_results.facets.get('facet_dates', {}),
'queries': raw_results.facets.get('facet_queries', {}),
}
for key in ['fields']:
for facet_field in facets[key]:
# Convert to a two-tuple, as Solr's json format returns a list of
# pairs.
facets[key][facet_field] = zip(facets[key][facet_field][::2], facets[key][facet_field][1::2])
if getattr(settings, 'HAYSTACK_INCLUDE_SPELLING', False) is True:
if hasattr(raw_results, 'spellcheck'):
if len(raw_results.spellcheck.get('suggestions', [])):
# For some reason, it's an array of pairs. Pull off the
# collated result from the end.
spelling_suggestion = raw_results.spellcheck.get('suggestions')[-1]
indexed_models = site.get_indexed_models()
for raw_result in raw_results.docs:
app_label, model_name = raw_result[DJANGO_CT].split('.')
additional_fields = {}
model = get_model(app_label, model_name)
if model and model in indexed_models:
for key, value in raw_result.items():
index = site.get_index(model)
string_key = str(key)
if string_key in index.fields and hasattr(index.fields[string_key], 'convert'):
additional_fields[string_key] = index.fields[string_key].convert(value)
else:
additional_fields[string_key] = self.conn._to_python(value)
del(additional_fields[DJANGO_CT])
del(additional_fields[DJANGO_ID])
del(additional_fields['score'])
if raw_result[ID] in getattr(raw_results, 'highlighting', {}):
additional_fields['highlighted'] = raw_results.highlighting[raw_result[ID]]
result = result_class(app_label, model_name, raw_result[DJANGO_ID], raw_result['score'], searchsite=self.site, **additional_fields)
results.append(result)
else:
hits -= 1
return {
'results': results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
def build_schema(self, fields):
content_field_name = ''
schema_fields = []
for field_name, field_class in fields.items():
field_data = {
'field_name': field_class.index_fieldname,
'type': 'text',
'indexed': 'true',
'stored': 'true',
'multi_valued': 'false',
}
if field_class.document is True:
content_field_name = field_class.index_fieldname
# DRL_FIXME: Perhaps move to something where, if none of these
# checks succeed, call a custom method on the form that
# returns, per-backend, the right type of storage?
if field_class.field_type in ['date', 'datetime']:
field_data['type'] = 'date'
elif field_class.field_type == 'integer':
field_data['type'] = 'slong'
elif field_class.field_type == 'float':
field_data['type'] = 'sfloat'
elif field_class.field_type == 'boolean':
field_data['type'] = 'boolean'
elif field_class.field_type == 'ngram':
field_data['type'] = 'ngram'
elif field_class.field_type == 'edge_ngram':
field_data['type'] = 'edge_ngram'
if field_class.is_multivalued:
field_data['multi_valued'] = 'true'
if field_class.stored is False:
field_data['stored'] = 'false'
# Do this last to override `text` fields.
if field_class.indexed is False:
field_data['indexed'] = 'false'
# If it's text and not being indexed, we probably don't want
# to do the normal lowercase/tokenize/stemming/etc. dance.
if field_data['type'] == 'text':
field_data['type'] = 'string'
# If it's a ``FacetField``, make sure we don't postprocess it.
if hasattr(field_class, 'facet_for'):
# If it's text, it ought to be a string.
if field_data['type'] == 'text':
field_data['type'] = 'string'
schema_fields.append(field_data)
return (content_field_name, schema_fields)
class SearchQuery(BaseSearchQuery):
def __init__(self, site=None, backend=None):
super(SearchQuery, self).__init__(site, backend)
if backend is not None:
self.backend = backend
else:
self.backend = SearchBackend(site=site)
def matching_all_fragment(self):
return '*:*'
def build_query_fragment(self, field, filter_type, value):
result = ''
# Handle when we've got a ``ValuesListQuerySet``...
if hasattr(value, 'values_list'):
value = list(value)
if not isinstance(value, (set, list, tuple)):
# Convert whatever we find to what pysolr wants.
value = self.backend.conn._from_python(value)
# Check to see if it's a phrase for an exact match.
if ' ' in value:
value = '"%s"' % value
index_fieldname = self.backend.site.get_index_fieldname(field)
# 'content' is a special reserved word, much like 'pk' in
# Django's ORM layer. It indicates 'no special field'.
if field == 'content':
result = value
else:
filter_types = {
'exact': "%s:%s",
'gt': "%s:{%s TO *}",
'gte': "%s:[%s TO *]",
'lt': "%s:{* TO %s}",
'lte': "%s:[* TO %s]",
'startswith': "%s:%s*",
}
if filter_type == 'in':
in_options = []
for possible_value in value:
in_options.append('%s:"%s"' % (index_fieldname, self.backend.conn._from_python(possible_value)))
result = "(%s)" % " OR ".join(in_options)
elif filter_type == 'range':
start = self.backend.conn._from_python(value[0])
end = self.backend.conn._from_python(value[1])
return "%s:[%s TO %s]" % (index_fieldname, start, end)
else:
result = filter_types[filter_type] % (index_fieldname, value)
return result
def run(self, spelling_query=None):
"""Builds and executes the query. Returns a list of search results."""
final_query = self.build_query()
kwargs = {
'start_offset': self.start_offset,
'result_class': self.result_class,
}
if self.order_by:
order_by_list = []
for order_by in self.order_by:
if order_by.startswith('-'):
order_by_list.append('%s desc' % order_by[1:])
else:
order_by_list.append('%s asc' % order_by)
kwargs['sort_by'] = ", ".join(order_by_list)
if self.end_offset is not None:
kwargs['end_offset'] = self.end_offset
if self.highlight:
kwargs['highlight'] = self.highlight
if self.facets:
kwargs['facets'] = list(self.facets)
if self.date_facets:
kwargs['date_facets'] = self.date_facets
if self.query_facets:
kwargs['query_facets'] = self.query_facets
if self.narrow_queries:
kwargs['narrow_queries'] = self.narrow_queries
if spelling_query:
kwargs['spelling_query'] = spelling_query
results = self.backend.search(final_query, **kwargs)
self._results = results.get('results', [])
self._hit_count = results.get('hits', 0)
self._facet_counts = self.post_process_facets(results)
self._spelling_suggestion = results.get('spelling_suggestion', None)
def run_mlt(self):
"""Builds and executes the query. Returns a list of search results."""
if self._more_like_this is False or self._mlt_instance is None:
raise MoreLikeThisError("No instance was provided to determine 'More Like This' results.")
additional_query_string = self.build_query()
kwargs = {
'start_offset': self.start_offset,
'result_class': self.result_class,
}
if self.end_offset is not None:
kwargs['end_offset'] = self.end_offset - self.start_offset
results = self.backend.more_like_this(self._mlt_instance, additional_query_string, **kwargs)
self._results = results.get('results', [])
self._hit_count = results.get('hits', 0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment