Skip to content

Instantly share code, notes, and snippets.

@oriolgual
Created March 22, 2013 11:01
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save oriolgual/5220494 to your computer and use it in GitHub Desktop.
Save oriolgual/5220494 to your computer and use it in GitHub Desktop.
Companion gist for my talk about using ElasticSearch with Ruby.
# Public: A module to be mixed in another class with common methods to index
# records in ElasticSearch.
#
# The host object needs to respond to 'indexed_attributes', which will return
# an array of the attributes names to be indexed.
#
# It's also recommended to override the 'save?' method to make sure only
# records that match some specifications are indexed.
#
# The type used for the ElasticSearch index will be extracted from the name of
# the indexer class. If you want to change it override the 'type' method.
#
# Example:
#
# class MyRecordIndexer
# include ElasticSearchIndexer
#
# def save?
# record.created_at > 1.month
# end
#
# def indexed_attributes
# ['aatribute1', 'attribute2']
# end
#
# def type
# 'custom_record_type'
# end
# end
#
module ElasticSearchIndexer
# Public: Returns the recording being indexed.
attr_reader :record
# Initializes a Indexer.
#
# recording - The record being indexed.
def initialize(record)
@record = record.decorate
end
# Public: Saves the record to the ElasticSearch index.
def save
connection.save(indexed_record, record.id) if save?
end
# Public: A guard to check if the record should be saved to ElasticSearch
# or not
#
# Returns true.
def save?
true
end
# Public: Returns a String with the type that should be used in the
# ElasticSearch index.
def type
self.class.name.demodulize.underscore.gsub('_indexer','')
end
private
# Internal: Returns a Connection to interact with ElasticSearch.
def connection
ElasticSearch::Connection.new(type: type)
end
# Internal: Builds a hash with the indexed attributes of the record.
#
# Returns a Hash.
def indexed_record
indexed_record = indexed_attributes.inject({}) do |hash, attribute|
hash.update(attribute => record.send(attribute))
end
indexed_record.delete_if { |k, v| v.blank? }
end
end
# Public: An observer to syncronize the changes to ElasticSearch.
class ElasticSearchObserver < ActiveRecord::Observer
observe :user, :device, :recording
# Public: Listens for after_saves callbacks for the observed models and
# sotres them in ElasticSearch.
#
# Returns nothing.
def after_save(record)
serializer_class(record).new(record).save
end
private
# Internal: Builds the class that will take care of saved the record to
# ElasticSearch.
#
# Returns a Class.
def serializer_class(record)
"#{record.class.name}Indexer".constantize
end
end
module ElasticSearch
# Internal: Builds a filtered ElasticSearch query.
class FilteredQuery
attr_reader :query, :filters, :join_mode
# Initializes a FilteredQuery.
#
# query - A String or Hash with the query to perform in ElasticSearch.
# filters - An Array of filters to be applied to the search.
# join_mode - An optional String or Symbol to specify how to join filters.
# Defaults to 'and'.
def initialize(query, filters, join_mode = nil)
@query = query.dup
@filters = filters.dup
@join_mode = join_mode || :and
end
# Public: Builds a hash with the query and filters.
#
# Returns a Hash.
def to_hash
{filtered: {query: query, filter: {join_mode => filters}}}
end
end
end
require_relative 'wrapped_query'
require_relative 'filtered_query'
module ElasticSearch
# Public: Helps building a query hash for ElasticSearch.
class QueryBuilder
# Public: Writer methd to set the search query.
attr_accessor :query
# Public: An array of filters to be applied to the search.
attr_accessor :filters
# Public: An array of wrappers to be applied to the search.
attr_accessor :wrappers
# Public: Initializes a QueryBuilder.
def initialize(args = {})
@filters = args.fetch(:filters, [])
@wrappers = args.fetch(:wrappers, [])
@query = args.fetch(:query, {match_all: {}})
end
# Public: Builds the query for ElasticSearch.
#
# Returns a Hash.
def to_elastic_search
query_hash = query
query_hash = FilteredQuery.new(query_hash, filters) if filters.any?
query_hash = WrappedQuery.new(query_hash, wrappers) if wrappers.any?
{query: query_hash.to_hash}
end
end
end
require_relative 'elastic_search_indexer'
# Public: Handles the indexing of users to ElasticSearch, so it can be
# indexed and searched later in the app.
#
# Example:
#
# UserIndexer.new(user).save
#
class UserIndexer
include ElasticSearchIndexer
# Public: A guard to only save complete users to ElasticSearch.
#
# Returns true if the User is complete.
# Returns false if the User is not complete.
def save?
record.complete
end
# Internal: Returns an Array with the names of the attributes that should be
# indexed in ElasticSearch.
def indexed_attributes
['name', 'description', 'username', 'tags', 'location', 'mobile', 'updated_at']
end
end
# Primary Actor: A logged user.
# Goal: User finds other users on the system.
#
# Examples
#
# barcelona = ['41.4695761,2.069525800000065', '41.320004,2.228009899999961']
# search = UserSearch.new(user).in_area(barcelona).keyword('Joan')
#
# search.each {|user| puts user.name}
#
# search.users
#
class UserSearch
extend Forwardable
include Enumerable
# Public: Delegate each to users to implement Enumerable.
def_delegators :users, :each
# Public: Returns the User that is searching.
attr_reader :current_user
# Public: Returns an Integer to use as the limit of users to return.
attr_reader :size
# Public: Initializes a UserSearch.
#
# current_user - The User that is searching.
def initialize(current_user)
@current_user = current_user
@size = 50
end
# Public: Changes the search scope to only search for the favorite users of
# the current user.
#
# Returns a UserSearch.
def favorites
@scope = current_user.favorites
self
end
# Public: Searches users that have any text matching the keywoed.
#
# keyword - The String keyword to filter users.
#
# Returns a UserSearch.
def search(keyword)
query.query = {query_string: {query: keyword}}
self
end
# Public: Searches users inside a geographic area.
#
# top_left - A String with the top-left limit of the area. The String
# format must be "latitude,longitude".
# bottom_right - A String with the bottom-roght limit of the area. The String
# format must be "latitude,longitude".
#
# Returns a UserSearch.
def in_area(top_left, bottom_right)
query.filters.push({geo_bounding_box: {location: {top_left: top_left, bottom_right: bottom_right} }})
self
end
# Public: Filters the users to only allow mobile users. A valid mobile user
# has an associated device and has updated her position (at least) two hours
# ago.
#
# Returns a UserSearch.
def mobile
query.filters.push({range: {position_updated_at: {gte: 2.hours.ago}}})
query.filters.push({bool: {must: {term: {mobile: true}}}})
self
end
# Public: Filters users to only allow active ones and boosts the users by the
# last activity time.
#
# Returns a UserSearch.
def live
limit = (30.minutes.ago.to_f * 1000).to_i
query.filters.push({range: {updated_at: {gte: limit}}})
query.wrappers.push({
"custom_score" => {
"params"=> {
"live_limit" => limit
},
"script"=> "_score * (abs(doc['updated_at'].date.getMillis() - live_limit) / 1000000.0)"
}
})
self
end
# Public: Limits the number of results to return.
#
# Returns a UserSearch.
def limit(limit)
@size = limit
self
end
# Public: Returns the collection of users the match the search.
#
# Returns an ActiveRecord::Relation.
def users
@users ||= scope.where(id: user_ids).sort_by { |user| user_ids.index(user.id.to_s) }
end
# Internal: Returns the ids of the users that match the search.
#
# Returns an Array of ids.
def user_ids
@user_ids ||= perform.hits
end
# Internal: Performs the search to ElasticSearch.
#
# Returns a ElasticSearch::Api::Hits.
def perform
elastic_search.search(query.to_elastic_search, {ids_only: true, size: size})
end
private
# Internal: A set of default fitlers to apply to every ElasticSearch search.
#
# Returns an Array.
def default_filters
[{bool: {must_not: {term: {_id: current_user.id}}}}]
end
# Internal: The query to add filters, wrappers and queries to send to
# ElasticSearch.
#
# Returns a ElasticSearch::QueryBuilder.
def query
@query ||= ElasticSearch::QueryBuilder.new(filters: default_filters)
end
# Internal: The scope to start searching for users in ActiveRecord after
# finding the ids in ElasticSearch.
#
# Returns an ActiveRecord::Relation.
def scope
@scope ||= User.scoped
end
# Internal: The client to connect to ElasticSearch.
#
# Returns a ElasticSearch::Client.
def elastic_search
@client ||= ElasticSearch::Connection.new(type: 'user').client
end
end
module ElasticSearch
# Internal: Builds a wrapped ElasticSearch query.
class WrappedQuery
attr_reader :query, :wrappers
# Initializes a WrappedQuery.
#
# query - A String or Hash with the query to perform in ElasticSearch.
# wrappers - An Array of wrappers to be applied to the search.
def initialize(query, wrappers)
@query = query.dup
@wrappers = wrappers.dup
end
# Public: Builds a hash with the wrapped query. The first wrapper in the
# array will be the first applied.
#
# Returns a Hash.
def to_hash
wrappers.inject(deepest_wrapped_query) do |wrapped_query, wrapper|
wrap_query(wrapper, wrapped_query)
end
end
private
# Internal: Returns the deepest wrapper to apply the query to.
#
# Returns a Hash.
def deepest_wrapped_query
@deeptest_wrapped_query ||= wrap_query(wrappers.shift, query)
end
# Internal: Wraps a query in a wrapper.
#
# wrapper - The wrapper to wrap the query.
# wrapped_query - The query that will be wrapped.
#
# Returns a Hash.
def wrap_query(wrapper, wrapped_query)
wrapper_name = wrapper.keys.first
wrapper[wrapper_name].update(query: wrapped_query.to_hash)
wrapper
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment