public
Created

Companion gist for my talk about using ElasticSearch with Ruby.

  • Download Gist
elastic_search_indexer.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
# Public: A module to be mixed in another class with common methods to index
# records in ElasticSearch.
#
# The host object needs to respond to 'indexed_attributes', which will return
# an array of the attributes names to be indexed.
#
# It's also recommended to override the 'save?' method to make sure only
# records that match some specifications are indexed.
#
# The type used for the ElasticSearch index will be extracted from the name of
# the indexer class. If you want to change it override the 'type' method.
#
# Example:
#
# class MyRecordIndexer
# include ElasticSearchIndexer
#
# def save?
# record.created_at > 1.month
# end
#
# def indexed_attributes
# ['aatribute1', 'attribute2']
# end
#
# def type
# 'custom_record_type'
# end
# end
#
module ElasticSearchIndexer
# Public: Returns the recording being indexed.
attr_reader :record
 
# Initializes a Indexer.
#
# recording - The record being indexed.
def initialize(record)
@record = record.decorate
end
 
# Public: Saves the record to the ElasticSearch index.
def save
connection.save(indexed_record, record.id) if save?
end
 
# Public: A guard to check if the record should be saved to ElasticSearch
# or not
#
# Returns true.
def save?
true
end
 
# Public: Returns a String with the type that should be used in the
# ElasticSearch index.
def type
self.class.name.demodulize.underscore.gsub('_indexer','')
end
 
private
# Internal: Returns a Connection to interact with ElasticSearch.
def connection
ElasticSearch::Connection.new(type: type)
end
 
# Internal: Builds a hash with the indexed attributes of the record.
#
# Returns a Hash.
def indexed_record
indexed_record = indexed_attributes.inject({}) do |hash, attribute|
hash.update(attribute => record.send(attribute))
end
 
indexed_record.delete_if { |k, v| v.blank? }
end
end
elastic_search_observer.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
# Public: An observer to syncronize the changes to ElasticSearch.
class ElasticSearchObserver < ActiveRecord::Observer
observe :user, :device, :recording
 
# Public: Listens for after_saves callbacks for the observed models and
# sotres them in ElasticSearch.
#
# Returns nothing.
def after_save(record)
serializer_class(record).new(record).save
end
 
private
# Internal: Builds the class that will take care of saved the record to
# ElasticSearch.
#
# Returns a Class.
def serializer_class(record)
"#{record.class.name}Indexer".constantize
end
end
filtered_query.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
module ElasticSearch
# Internal: Builds a filtered ElasticSearch query.
class FilteredQuery
attr_reader :query, :filters, :join_mode
 
# Initializes a FilteredQuery.
#
# query - A String or Hash with the query to perform in ElasticSearch.
# filters - An Array of filters to be applied to the search.
# join_mode - An optional String or Symbol to specify how to join filters.
# Defaults to 'and'.
def initialize(query, filters, join_mode = nil)
@query = query.dup
@filters = filters.dup
@join_mode = join_mode || :and
end
 
# Public: Builds a hash with the query and filters.
#
# Returns a Hash.
def to_hash
{filtered: {query: query, filter: {join_mode => filters}}}
end
end
end
query_builder.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
require_relative 'wrapped_query'
require_relative 'filtered_query'
 
module ElasticSearch
# Public: Helps building a query hash for ElasticSearch.
class QueryBuilder
# Public: Writer methd to set the search query.
attr_accessor :query
 
# Public: An array of filters to be applied to the search.
attr_accessor :filters
 
# Public: An array of wrappers to be applied to the search.
attr_accessor :wrappers
 
# Public: Initializes a QueryBuilder.
def initialize(args = {})
@filters = args.fetch(:filters, [])
@wrappers = args.fetch(:wrappers, [])
@query = args.fetch(:query, {match_all: {}})
end
 
# Public: Builds the query for ElasticSearch.
#
# Returns a Hash.
def to_elastic_search
query_hash = query
query_hash = FilteredQuery.new(query_hash, filters) if filters.any?
query_hash = WrappedQuery.new(query_hash, wrappers) if wrappers.any?
 
{query: query_hash.to_hash}
end
end
end
user_indexer.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
require_relative 'elastic_search_indexer'
# Public: Handles the indexing of users to ElasticSearch, so it can be
# indexed and searched later in the app.
#
# Example:
#
# UserIndexer.new(user).save
#
class UserIndexer
include ElasticSearchIndexer
 
# Public: A guard to only save complete users to ElasticSearch.
#
# Returns true if the User is complete.
# Returns false if the User is not complete.
def save?
record.complete
end
 
# Internal: Returns an Array with the names of the attributes that should be
# indexed in ElasticSearch.
def indexed_attributes
['name', 'description', 'username', 'tags', 'location', 'mobile', 'updated_at']
end
end
user_search_service.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
# Primary Actor: A logged user.
# Goal: User finds other users on the system.
#
# Examples
#
# barcelona = ['41.4695761,2.069525800000065', '41.320004,2.228009899999961']
# search = UserSearch.new(user).in_area(barcelona).keyword('Joan')
#
# search.each {|user| puts user.name}
#
# search.users
#
class UserSearch
extend Forwardable
include Enumerable
 
# Public: Delegate each to users to implement Enumerable.
def_delegators :users, :each
 
# Public: Returns the User that is searching.
attr_reader :current_user
 
# Public: Returns an Integer to use as the limit of users to return.
attr_reader :size
 
# Public: Initializes a UserSearch.
#
# current_user - The User that is searching.
def initialize(current_user)
@current_user = current_user
@size = 50
end
 
# Public: Changes the search scope to only search for the favorite users of
# the current user.
#
# Returns a UserSearch.
def favorites
@scope = current_user.favorites
self
end
 
# Public: Searches users that have any text matching the keywoed.
#
# keyword - The String keyword to filter users.
#
# Returns a UserSearch.
def search(keyword)
query.query = {query_string: {query: keyword}}
self
end
 
# Public: Searches users inside a geographic area.
#
# top_left - A String with the top-left limit of the area. The String
# format must be "latitude,longitude".
# bottom_right - A String with the bottom-roght limit of the area. The String
# format must be "latitude,longitude".
#
# Returns a UserSearch.
def in_area(top_left, bottom_right)
query.filters.push({geo_bounding_box: {location: {top_left: top_left, bottom_right: bottom_right} }})
self
end
 
# Public: Filters the users to only allow mobile users. A valid mobile user
# has an associated device and has updated her position (at least) two hours
# ago.
#
# Returns a UserSearch.
def mobile
query.filters.push({range: {position_updated_at: {gte: 2.hours.ago}}})
query.filters.push({bool: {must: {term: {mobile: true}}}})
self
end
 
# Public: Filters users to only allow active ones and boosts the users by the
# last activity time.
#
# Returns a UserSearch.
def live
limit = (30.minutes.ago.to_f * 1000).to_i
 
query.filters.push({range: {updated_at: {gte: limit}}})
query.wrappers.push({
"custom_score" => {
"params"=> {
"live_limit" => limit
},
"script"=> "_score * (abs(doc['updated_at'].date.getMillis() - live_limit) / 1000000.0)"
}
})
self
end
 
# Public: Limits the number of results to return.
#
# Returns a UserSearch.
def limit(limit)
@size = limit
self
end
 
# Public: Returns the collection of users the match the search.
#
# Returns an ActiveRecord::Relation.
def users
@users ||= scope.where(id: user_ids).sort_by { |user| user_ids.index(user.id.to_s) }
end
 
# Internal: Returns the ids of the users that match the search.
#
# Returns an Array of ids.
def user_ids
@user_ids ||= perform.hits
end
 
# Internal: Performs the search to ElasticSearch.
#
# Returns a ElasticSearch::Api::Hits.
def perform
elastic_search.search(query.to_elastic_search, {ids_only: true, size: size})
end
 
private
# Internal: A set of default fitlers to apply to every ElasticSearch search.
#
# Returns an Array.
def default_filters
[{bool: {must_not: {term: {_id: current_user.id}}}}]
end
 
# Internal: The query to add filters, wrappers and queries to send to
# ElasticSearch.
#
# Returns a ElasticSearch::QueryBuilder.
def query
@query ||= ElasticSearch::QueryBuilder.new(filters: default_filters)
end
 
# Internal: The scope to start searching for users in ActiveRecord after
# finding the ids in ElasticSearch.
#
# Returns an ActiveRecord::Relation.
def scope
@scope ||= User.scoped
end
 
# Internal: The client to connect to ElasticSearch.
#
# Returns a ElasticSearch::Client.
def elastic_search
@client ||= ElasticSearch::Connection.new(type: 'user').client
end
end
wrapped_query.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
module ElasticSearch
# Internal: Builds a wrapped ElasticSearch query.
class WrappedQuery
attr_reader :query, :wrappers
 
# Initializes a WrappedQuery.
#
# query - A String or Hash with the query to perform in ElasticSearch.
# wrappers - An Array of wrappers to be applied to the search.
def initialize(query, wrappers)
@query = query.dup
@wrappers = wrappers.dup
end
 
# Public: Builds a hash with the wrapped query. The first wrapper in the
# array will be the first applied.
#
# Returns a Hash.
def to_hash
wrappers.inject(deepest_wrapped_query) do |wrapped_query, wrapper|
wrap_query(wrapper, wrapped_query)
end
end
 
private
# Internal: Returns the deepest wrapper to apply the query to.
#
# Returns a Hash.
def deepest_wrapped_query
@deeptest_wrapped_query ||= wrap_query(wrappers.shift, query)
end
 
# Internal: Wraps a query in a wrapper.
#
# wrapper - The wrapper to wrap the query.
# wrapped_query - The query that will be wrapped.
#
# Returns a Hash.
def wrap_query(wrapper, wrapped_query)
wrapper_name = wrapper.keys.first
wrapper[wrapper_name].update(query: wrapped_query.to_hash)
wrapper
end
end
end

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.