Skip to content

Instantly share code, notes, and snippets.

@grantr
Created April 25, 2011 22:08
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save grantr/941359 to your computer and use it in GitHub Desktop.
Save grantr/941359 to your computer and use it in GitHub Desktop.
ActiveModel module for elasticsearch indexing
# class Person
# include SearchableModel
# include SearchableModel::SearchMethods
#
# ...
#
# end
module SearchableModel
extend ActiveSupport::Concern
included do
class_attribute :search_index, :search_type, :indexing_disabled
self.search_index = model_name.collection
self.search_type = model_name.element
self.indexing_disabled = false
after_save :add_to_index
after_destroy :delete_from_index
include IndexConfiguration
extend IndexCreation
end
module ClassMethods
def disable_indexing!
self.indexing_disabled = true
end
def enable_indexing!
self.indexing_disabled = false
end
def search_client
@search_client ||= ElasticSearch.new(ES_CONFIG["servers"], ES_CONFIG["options"].merge(index: search_index, type: search_type))
end
def refresh_index!
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'refresh') do
search_client.refresh
end
end
def delete_all
super
begin
search_client.delete_index(search_index)
rescue ElasticSearch::RequestError => e
raise e unless e.message =~ /missing/
end
end
end
def search_client
self.class.search_client
end
def add_to_index
return false if indexing_disabled
document = respond_to?(:to_search) ? to_search : default_to_search
ActiveSupport::Notifications.instrument('index.elastic_search', :id => key.to_s, :document => document) do
search_client.index(document, :id => id, :type => search_type)
end
end
def delete_from_index
return false if indexing_disabled
ActiveSupport::Notifications.instrument('delete.elastic_search', :id => key.to_s) do
search_client.delete(key.to_s)
end
end
def default_to_search
as_json
end
# extend FinderMethods to get activerecord style find, count, paginate, etc.
# depends on SearchMethods
module FinderMethods
def find(*args)
options = args.extract_options!
#TODO should do activerecord style logging/benchmark
case args.first
when :first
find_initial(options)
when :last
find_last(options)
when :all
find_every(options)
else
find_with_ids(args, options)
end
end
def find_every(options)
search(options)
end
def count(options={})
count_every(options)
end
def paginate(*args)
paginate_every(*args)
end
def paginated_each(options = {}, &block)
#if options[:order]
options = { :page => 1, limit: 100 }.merge options
options[:page] = options[:page].to_i
total = 0
begin
collection = paginate(options)
total += collection.each { |item| yield item }.size
options[:page] += 1
end until collection.size < collection.per_page
total
#else
# scan_every(options, &block)
#end
end
end
module Paginatable
# will_paginate methods
attr_reader :current_page, :per_page, :total_entries
def total_pages
(total_entries / per_page.to_f).ceil
end
def offset
(current_page - 1) * per_page
end
alias_method :offset_value, :offset
def previous_page
current_page > 1 ? (current_page - 1) : nil
end
def next_page
current_page < total_pages ? (current_page + 1) : nil
end
# kaminari methods
alias_method :num_pages, :total_pages
alias_method :limit_value, :per_page
alias_method :total_count, :total_entries
def paginated(current_page, per_page, total_entries)
@current_page = current_page.to_i
@per_page = per_page.to_i
@total_entries = total_entries.to_i
self
end
#TODO implement will_paginate methods if needed (but probably not)
end
# extend SearchMethods to get count_every, paginate_every, and search
module SearchMethods
def count_every(options={})
options = options.with_indifferent_access
query = compose_query(options)
Rails.logger.debug("query: #{query.inspect}")
Rails.logger.debug("options: #{options.inspect}")
ActiveSupport::Notifications.instrument('count.elastic_search', :query => query) do
begin
search_client.count(query, options)
rescue ElasticSearch::RequestError => e
if e.message =~ /IndexMissingException/
0
else
raise e
end
end
end
end
def paginate_every(*args)
options = args.extract_options!
hits = get_hits_from_index(options)
#TODO either this should handle a non-paginated hits, or get_hits_from_index should not return [] when index is missing
collection = options[:ids_only] ? hits.to_a : find_with_ids([hits.to_a])
# if collection is frozen it can't be extended
collection = collection.dup if collection.frozen?
collection.extend(Paginatable)
collection.paginated(hits.current_page, hits.per_page, hits.total_entries) if hits.respond_to?(:current_page)
collection
end
# scan through results like paginated_each, but use scrolling
#TODO use scan when 0.16.0 is released:
#options = {:scroll => '5m', :limit => 100, :search_type => 'scan'}.merge options
def scan_every(options={})
options = {:scroll => '5m', :limit => 100}.merge options
total = 0
# get first batch
hits = get_hits_from_index(options)
objects = find_with_ids([hits.to_a])
total += objects.each { |item| yield item }.size
# scroll through the rest
search_client.scroll(hits.scroll_id, :scroll => options[:scroll], :ids_only => true) do |hits|
objects = find_with_ids([hits.to_a])
total += objects.each { |item| yield item }.size
end
total
end
## :query => lucene query string or es query hash (if hash, then conditions and with are ignored)
## :conditions => hash of attributes to search on (target_id, base_version_id, etc
# :with => filters (no analysis)
# :without => exclusion filters (no analysis)
## :limit, :offset
## :order => this should be an array of sort strings (field, field:reverse, field asc, field desc)
#
# conditions/with hash should support single value, array, or range
def search(options={})
hits = get_hits_from_index(options)
#Rails.logger.debug("hits: #{hits.inspect}")
# return find_with_ids on result
options[:ids_only] ? hits.to_a : find_with_ids([hits.to_a])
end
# Lucene special characters:
# + - && || ! ( ) { } [ ] ^ " ~ * ? : \
LUCENE_ESCAPE_REGEX =
/(\+|-|&&|\|\||!|\(|\)|{|}|\[|\]|`|"|~|\?|:|\\)/
def lucene_escape(query)
query.gsub(LUCENE_ESCAPE_REGEX, "\\\\\\1")
end
private
def get_hits_from_index(options={})
options = options.with_indifferent_access
query = { :query => compose_query(options) }
if options[:order] && !query[:sort]
query[:sort] = compose_sorts(options[:order])
end
options[:from] = options[:offset] if options[:offset]
options[:size] = options[:limit] if options[:limit]
Rails.logger.debug("query: #{query.inspect}")
Rails.logger.debug("options: #{options.inspect}")
# do search, get hits
#TODO could also change search_type to query_and_fetch and remove dupes locally (not a big difference in wire size, since only ids are returned)
ActiveSupport::Notifications.instrument('search.elastic_search', :query => query) do
begin
search_client.search(query, options.merge(:ids_only => true))
rescue ElasticSearch::RequestError => e
if e.message =~ /IndexMissingException/
[] #TODO this causes paginate to fail if the index is missing
else
raise e
end
end
end
end
def compose_query(options)
query = options.delete(:query)
case query
when String, nil
query_string = query #TODO should accept hash query as well if there is no :query key
# if we have :with or :without, then use a filtered query on the outside
# if we have one :conditions, then use a term query on the inside (range should be handled by range query, array of terms by a bool MUST query)
# if we have multiple :conditions, then use a bool query on the inside
# if we have a query string and :conditions, use a bool query around query string and conditions
# if we have only a query string, then just use that as the query
#TODO inconsistent results when using with for certain queries
#TODO add a lucene escape function
with_filters = compose_filters(options[:with]) unless options[:with].blank?
without_filters = compose_filters(options[:without]) unless options[:without].blank?
conditions = compose_conditions(options[:conditions]) unless options[:conditions].blank?
prefix = compose_prefix(options[:prefix]) unless options[:prefix].blank?
inner_query = nil
inner_filter = nil
if conditions || query_string || prefix
inner_query = { :bool => { :must => [] }}
inner_query[:bool][:must] << { :bool => { :must => conditions }} if conditions
inner_query[:bool][:must] << { :query_string => { :query => query_string }} unless query_string.blank?
inner_query[:bool][:must] << { :bool => { :must => prefix }} if prefix
end
if with_filters || without_filters
inner_filter = { :bool => {}}
inner_filter[:bool][:must] = with_filters if with_filters
inner_filter[:bool][:must_not] = without_filters if without_filters
end
if inner_query && inner_filter
# if we have both query and filters, wrap them in an and filter
outer_filter = { :and => [] }
outer_filter[:and] << { :query => inner_query }
outer_filter[:and] << inner_filter
elsif inner_query
# if there is just a query, then just use a query_filter
outer_filter = { :query => inner_query }
elsif inner_filter
# if there is only a filter, then use that directly
outer_filter = inner_filter
else
outer_filter = { :match_all => {}}
end
# outer query is always constant_score
outer_query = { :constant_score => { :filter => outer_filter} }
query = outer_query
when Hash #TODO this should only happen if :query option is specified
# query hash (ES query dsl)
end
query
end
def normalize_value(value)
case value
when true
"T"
when false
"F"
when Time
value.iso8601
else
value.to_s
end
end
# with: {field: nil} is all records missing field
# with: {field: :empty} is all records where field is the empty string
# with: {field: :blank} is all records where field is missing or empty
# with: {field: :nonblank} is all records where field is not missing or empty
def compose_filters(withs)
withs.collect do |field, value|
case value
when Array
{:terms => { field => value.collect { |v| normalize_value(v) } }}
when Range
{ :range => { field => {:from => normalize_value(value.begin), :to => normalize_value(value.end) }}}
when Hash
{ field => value }
when nil
{ :missing => { :field => field }}
when :blank
blank_filter(field)
when :nonblank
{ :not => { :filter => blank_filter(field) }}
when :empty
{:term => { field => "" }}
else
{:term => { field => normalize_value(value) }}
end
end
end
def blank_filter(field)
{ :or => [
{ :missing => { :field => field }},
{ :term => { field => "" }}]
}
end
def compose_conditions(conditions)
conditions.collect do |field, value|
case value
when Array
{ :dis_max => { :queries => value.collect { |t| { :term => { field => normalize_value(t) }} } }}
when Range
{ :range => { field => {:from => normalize_value(value.begin), :to => normalize_value(value.end) }}}
when Hash
{ field => value }
else
{ :field => { field => lucene_escape(normalize_value(value)) }}
end
end
end
def compose_prefix(prefixes)
prefixes.collect do |field, value|
{ :prefix => { field => value } }
end
end
def compose_sorts(orders)
sorts = []
Array(orders).each do |order|
case order
when Hash
sorts << order
when /^(.+) (desc|asc)$/i
sorts << { $1 => $2.downcase }
when /^(.+):reverse$/i
sorts << { $1 => { :reverse => true }}
else
sorts << order
end
end
sorts
end
end
module IndexConfiguration
extend ActiveSupport::Concern
included do
class_attribute :index_creation_options
class_attribute :index_mapping
self.index_creation_options = {}
self.index_mapping = {}
end
end
module IndexCreation
def create_index!(index_name=search_index)
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'create index', :description => index_name) do
search_client.create_index(index_name, index_creation_options)
end
#update mapping if necessary
unless index_mapping.empty?
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'update mapping', :description => index_name) do
search_client.update_mapping(index_mapping, :index => index_name)
end
end
true
end
def create_pending_index!(index_name=generate_pending_index_name)
raise "An index is already pending. Delete or unalias that index first." if current_pending_index
create_index!(index_name)
# alias to search_index-pending
alias_actions = {:add => {index_name => pending_index_alias}}
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'alias index', :description => alias_actions.inspect) do
search_client.alias_index(alias_actions)
end
end
def deploy_index!(index_to_deploy=current_pending_index)
raise "No index to deploy. Create a pending index first." unless index_to_deploy
alias_actions = { :add => {index_to_deploy => search_index}, :remove => {index_to_deploy => pending_index_alias}}
if deployed_index = current_deployed_index
alias_actions[:remove][deployed_index] = search_index
end
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'alias index', :description => alias_actions.inspect) do
search_client.alias_index(alias_actions)
end
end
def delete_stale_indices!
deployed_index = current_deployed_index
raise "No index is deployed. Deploy an index first." unless deployed_index
# find all indices that start with search_index-
all_indices = search_client.index_status(:all)["indices"].keys.select { |i| i =~ /^#{search_index}-/ }
# remove the currently deployed index from the list
stale_indices = all_indices - [deployed_index]
stale_indices.each do |index|
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'delete index', :description => index) do
search_client.delete_index(index)
end
end
end
# Takes a block with parameters batch and error
# If error is not nil, there was an exception indexing
# There is no way to tell which object in the batch caused the error.
# reindex_all! do |batch, err|
# if err
# puts "error indexing batch #{err} #{err.message} #{batch.collect(&:id).inspect}"
# else
# puts "indexed #{batch.collect(&:id).inspect}"
# end
# end
# TODO for 0.16.0 update refresh and merge settings to increase indexing throughput, change back after
def reindex_all!(options={}, &block)
options = options.dup
index = options.delete(:index)
index ||= options.delete(:pending) ? current_pending_index : search_index
batch_reindex = ->(batch) do
begin
ActiveSupport::Notifications.instrument('operation.elastic_search', :name => 'bulk index', :description => "(#{batch.size})") do
search_client.bulk do |c|
batch.each do |object|
document = object.respond_to?(:to_search) ? object.to_search : object.default_to_search
c.index(document, :id => object.id, :index => index, :type => object.search_type)
end
end
end
yield(batch, nil) if block_given?
rescue => e
Rails.logger.warn("Error reindexing batch: #{e} #{e.message}")
yield(batch, e) if block_given?
end
end
# can pass a :batch option with an array of objects to index
if options[:batch]
batch_reindex.call(options[:batch])
else
find_in_batches(options) do |batch|
batch_reindex.call(batch)
end
end
end
def current_pending_index
pending_index, _ = search_client.index_status(:all)["indices"].detect { |name, status| status["aliases"].include?(pending_index_alias) }
pending_index
end
def current_deployed_index
deployed_index, _ = search_client.index_status(:all)["indices"].detect { |name, status| status["aliases"].include?(search_index) }
deployed_index
end
private
def generate_pending_index_name
# index name with time suffix
"#{search_index}-#{Time.current.utc.strftime("%Y%m%d%H%M%S")}"
end
def pending_index_alias
"#{search_index}-pending"
end
end
class LogSubscriber < ActiveSupport::LogSubscriber
def index(event)
name = 'ElasticSearch index (%.1fms)' % event.duration
record = "#{event.payload[:id]} #{event.payload[:document].inspect}"
debug " #{name} #{record}"
end
def delete(event)
name = 'ElasticSearch delete (%.1fms)' % event.duration
debug " #{name} #{event.payload[:id]}"
end
def search(event)
name = 'ElasticSearch search (%.1fms)' % event.duration
debug " #{name} #{event.payload[:query].inspect}"
end
def count(event)
name = 'ElasticSearch count (%.1fms)' % event.duration
debug " #{name} #{event.payload[:query].inspect}"
end
def operation(event)
name = 'ElasticSearch %s (%.1fms)' % [event.payload[:name], event.duration]
debug " #{name} #{event.payload[:description]}"
end
end
end
#TODO this is the wrong place to call this - causes issues with reload in dev mode.
SearchableModel::LogSubscriber.attach_to :elastic_search
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment