Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Companion gist for my talk about using ElasticSearch with Ruby.
# Public: A module to be mixed in another class with common methods to index
# records in ElasticSearch.
#
# The host object needs to respond to 'indexed_attributes', which will return
# an array of the attributes names to be indexed.
#
# It's also recommended to override the 'save?' method to make sure only
# records that match some specifications are indexed.
#
# The type used for the ElasticSearch index will be extracted from the name of
# the indexer class. If you want to change it override the 'type' method.
#
# Example:
#
# class MyRecordIndexer
# include ElasticSearchIndexer
#
# def save?
# record.created_at > 1.month
# end
#
# def indexed_attributes
# ['aatribute1', 'attribute2']
# end
#
# def type
# 'custom_record_type'
# end
# end
#
module ElasticSearchIndexer
# Public: Returns the recording being indexed.
attr_reader :record
# Initializes a Indexer.
#
# recording - The record being indexed.
def initialize(record)
@record = record.decorate
end
# Public: Saves the record to the ElasticSearch index.
def save
connection.save(indexed_record, record.id) if save?
end
# Public: A guard to check if the record should be saved to ElasticSearch
# or not
#
# Returns true.
def save?
true
end
# Public: Returns a String with the type that should be used in the
# ElasticSearch index.
def type
self.class.name.demodulize.underscore.gsub('_indexer','')
end
private
# Internal: Returns a Connection to interact with ElasticSearch.
def connection
ElasticSearch::Connection.new(type: type)
end
# Internal: Builds a hash with the indexed attributes of the record.
#
# Returns a Hash.
def indexed_record
indexed_record = indexed_attributes.inject({}) do |hash, attribute|
hash.update(attribute => record.send(attribute))
end
indexed_record.delete_if { |k, v| v.blank? }
end
end
# Public: An observer to syncronize the changes to ElasticSearch.
class ElasticSearchObserver < ActiveRecord::Observer
observe :user, :device, :recording
# Public: Listens for after_saves callbacks for the observed models and
# sotres them in ElasticSearch.
#
# Returns nothing.
def after_save(record)
serializer_class(record).new(record).save
end
private
# Internal: Builds the class that will take care of saved the record to
# ElasticSearch.
#
# Returns a Class.
def serializer_class(record)
"#{record.class.name}Indexer".constantize
end
end
module ElasticSearch
# Internal: Builds a filtered ElasticSearch query.
class FilteredQuery
attr_reader :query, :filters, :join_mode
# Initializes a FilteredQuery.
#
# query - A String or Hash with the query to perform in ElasticSearch.
# filters - An Array of filters to be applied to the search.
# join_mode - An optional String or Symbol to specify how to join filters.
# Defaults to 'and'.
def initialize(query, filters, join_mode = nil)
@query = query.dup
@filters = filters.dup
@join_mode = join_mode || :and
end
# Public: Builds a hash with the query and filters.
#
# Returns a Hash.
def to_hash
{filtered: {query: query, filter: {join_mode => filters}}}
end
end
end
require_relative 'wrapped_query'
require_relative 'filtered_query'
module ElasticSearch
# Public: Helps building a query hash for ElasticSearch.
class QueryBuilder
# Public: Writer methd to set the search query.
attr_accessor :query
# Public: An array of filters to be applied to the search.
attr_accessor :filters
# Public: An array of wrappers to be applied to the search.
attr_accessor :wrappers
# Public: Initializes a QueryBuilder.
def initialize(args = {})
@filters = args.fetch(:filters, [])
@wrappers = args.fetch(:wrappers, [])
@query = args.fetch(:query, {match_all: {}})
end
# Public: Builds the query for ElasticSearch.
#
# Returns a Hash.
def to_elastic_search
query_hash = query
query_hash = FilteredQuery.new(query_hash, filters) if filters.any?
query_hash = WrappedQuery.new(query_hash, wrappers) if wrappers.any?
{query: query_hash.to_hash}
end
end
end
require_relative 'elastic_search_indexer'
# Public: Handles the indexing of users to ElasticSearch, so it can be
# indexed and searched later in the app.
#
# Example:
#
# UserIndexer.new(user).save
#
class UserIndexer
include ElasticSearchIndexer
# Public: A guard to only save complete users to ElasticSearch.
#
# Returns true if the User is complete.
# Returns false if the User is not complete.
def save?
record.complete
end
# Internal: Returns an Array with the names of the attributes that should be
# indexed in ElasticSearch.
def indexed_attributes
['name', 'description', 'username', 'tags', 'location', 'mobile', 'updated_at']
end
end
# Primary Actor: A logged user.
# Goal: User finds other users on the system.
#
# Examples
#
# barcelona = ['41.4695761,2.069525800000065', '41.320004,2.228009899999961']
# search = UserSearch.new(user).in_area(barcelona).keyword('Joan')
#
# search.each {|user| puts user.name}
#
# search.users
#
class UserSearch
extend Forwardable
include Enumerable
# Public: Delegate each to users to implement Enumerable.
def_delegators :users, :each
# Public: Returns the User that is searching.
attr_reader :current_user
# Public: Returns an Integer to use as the limit of users to return.
attr_reader :size
# Public: Initializes a UserSearch.
#
# current_user - The User that is searching.
def initialize(current_user)
@current_user = current_user
@size = 50
end
# Public: Changes the search scope to only search for the favorite users of
# the current user.
#
# Returns a UserSearch.
def favorites
@scope = current_user.favorites
self
end
# Public: Searches users that have any text matching the keywoed.
#
# keyword - The String keyword to filter users.
#
# Returns a UserSearch.
def search(keyword)
query.query = {query_string: {query: keyword}}
self
end
# Public: Searches users inside a geographic area.
#
# top_left - A String with the top-left limit of the area. The String
# format must be "latitude,longitude".
# bottom_right - A String with the bottom-roght limit of the area. The String
# format must be "latitude,longitude".
#
# Returns a UserSearch.
def in_area(top_left, bottom_right)
query.filters.push({geo_bounding_box: {location: {top_left: top_left, bottom_right: bottom_right} }})
self
end
# Public: Filters the users to only allow mobile users. A valid mobile user
# has an associated device and has updated her position (at least) two hours
# ago.
#
# Returns a UserSearch.
def mobile
query.filters.push({range: {position_updated_at: {gte: 2.hours.ago}}})
query.filters.push({bool: {must: {term: {mobile: true}}}})
self
end
# Public: Filters users to only allow active ones and boosts the users by the
# last activity time.
#
# Returns a UserSearch.
def live
limit = (30.minutes.ago.to_f * 1000).to_i
query.filters.push({range: {updated_at: {gte: limit}}})
query.wrappers.push({
"custom_score" => {
"params"=> {
"live_limit" => limit
},
"script"=> "_score * (abs(doc['updated_at'].date.getMillis() - live_limit) / 1000000.0)"
}
})
self
end
# Public: Limits the number of results to return.
#
# Returns a UserSearch.
def limit(limit)
@size = limit
self
end
# Public: Returns the collection of users the match the search.
#
# Returns an ActiveRecord::Relation.
def users
@users ||= scope.where(id: user_ids).sort_by { |user| user_ids.index(user.id.to_s) }
end
# Internal: Returns the ids of the users that match the search.
#
# Returns an Array of ids.
def user_ids
@user_ids ||= perform.hits
end
# Internal: Performs the search to ElasticSearch.
#
# Returns a ElasticSearch::Api::Hits.
def perform
elastic_search.search(query.to_elastic_search, {ids_only: true, size: size})
end
private
# Internal: A set of default fitlers to apply to every ElasticSearch search.
#
# Returns an Array.
def default_filters
[{bool: {must_not: {term: {_id: current_user.id}}}}]
end
# Internal: The query to add filters, wrappers and queries to send to
# ElasticSearch.
#
# Returns a ElasticSearch::QueryBuilder.
def query
@query ||= ElasticSearch::QueryBuilder.new(filters: default_filters)
end
# Internal: The scope to start searching for users in ActiveRecord after
# finding the ids in ElasticSearch.
#
# Returns an ActiveRecord::Relation.
def scope
@scope ||= User.scoped
end
# Internal: The client to connect to ElasticSearch.
#
# Returns a ElasticSearch::Client.
def elastic_search
@client ||= ElasticSearch::Connection.new(type: 'user').client
end
end
module ElasticSearch
# Internal: Builds a wrapped ElasticSearch query.
class WrappedQuery
attr_reader :query, :wrappers
# Initializes a WrappedQuery.
#
# query - A String or Hash with the query to perform in ElasticSearch.
# wrappers - An Array of wrappers to be applied to the search.
def initialize(query, wrappers)
@query = query.dup
@wrappers = wrappers.dup
end
# Public: Builds a hash with the wrapped query. The first wrapper in the
# array will be the first applied.
#
# Returns a Hash.
def to_hash
wrappers.inject(deepest_wrapped_query) do |wrapped_query, wrapper|
wrap_query(wrapper, wrapped_query)
end
end
private
# Internal: Returns the deepest wrapper to apply the query to.
#
# Returns a Hash.
def deepest_wrapped_query
@deeptest_wrapped_query ||= wrap_query(wrappers.shift, query)
end
# Internal: Wraps a query in a wrapper.
#
# wrapper - The wrapper to wrap the query.
# wrapped_query - The query that will be wrapped.
#
# Returns a Hash.
def wrap_query(wrapper, wrapped_query)
wrapper_name = wrapper.keys.first
wrapper[wrapper_name].update(query: wrapped_query.to_hash)
wrapper
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.