Skip to content

Instantly share code, notes, and snippets.

@erickt
Created December 13, 2011 00:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erickt/1469802 to your computer and use it in GitHub Desktop.
Save erickt/1469802 to your computer and use it in GitHub Desktop.
class ElasticSearchUserError < StandardError
def initialize(failures)
@failures = failures
end
def to_s
"Failures:\n#{PP.pp @failures, ""}"
end
end
module CouchRest
class ExtendedDocument
class << self
def elasticsearch_type_name
self.to_s.downcase
end
def by_query_scroll(query, scroll, params={}, &block)
raw = params.delete(:raw)
scroll_id = params.delete(:scroll_id)
first = true
rows = []
total = 0
begin
if first
first = false
response = do_query(query, params.merge(:raw => true,
:scroll => scroll))
else
response = app { elasticsearch }.execute(:scroll, scroll_id,
:scroll => scroll)
raise_if_failures response
end
old_scroll_id = scroll_id
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
total += hits.length
unless hits.empty?
if block_given?
if raw
yield response
else
hits.each { |hit| yield prepare_row(hit) }
end
else
if raw
rows << response
else
rows.concat hits.map { |hit| prepare_row hit }
end
end
end
end until hits.empty?
if total != response['hits']['total']
response['hits'].delete('hits')
pp response
raise ElasticSearchUserError,
"Only #{total} of #{response['hits']['total']} docs returned!"
end
rows unless block_given?
end
def by_query_scan(query, scroll, params={}, &block)
by_query_scroll(query, scroll, params.merge(:search_type => 'scan'),
&block)
end
def by_query(query, params={}, &block)
# Support scolling
if params[:search_type] == "scan"
scroll = params.delete(:scroll)
by_query_scan(query, scroll, params, &block)
elsif params[:scroll]
scroll = params.delete(:scroll)
by_query_scroll(query, scroll, params, &block)
else
do_query(query, params, &block)
end
end
def by_query_string(query_string, params={}, &block)
if query_string.empty?
query = {}
else
query = {
:query => {
:query_string => {
:default_field => "_all",
:analyzer => "keyword",
:query => query_string }
}}
end
by_query(query, params, &block)
end
def by_multiget(query, params={}, &block)
# Query elasticsearch for the results. I'm not using rubberband's
# helper functions as I just want the raw json.
response = app { elasticsearch }.execute(:multi_get, "cmdb_#{RACK_ENV}",
elasticsearch_type_name, query,
params)
# Fail if any of the shards raised an error.
if response['_shards'] && response['_shards']['failed'] != 0
raise ElasticSearchUserError.new(response['_shards'])
end
if block_given?
response.each { |hit| yield hit }
else
response
end
end
private
# Look up in ElasticSearch for the documents.
def do_query(query, params={}, &block)
# Raw is used to decide whether or not to include the elasticsearch
# metadata.
raw = params.delete(:raw)
# Query elasticsearch for the results. I'm not using rubberband's
# helper functions as I just want the raw json.
response = app { elasticsearch }.execute(:search, "cmdb_#{RACK_ENV}",
elasticsearch_type_name, query,
params)
raise_if_failures response
if block_given?
if raw
yield response
else
response['hits']['hits'].each { |hit| yield prepare_row(hit) }
end
else
if raw
response
else
rows = response['hits']['hits'].map { |hit| prepare_row hit }
{ "total" => response['hits']['total'], "rows" => rows }
end
end
end
def prepare_row(hit)
row = {}
row.merge!(hit['fields'] || hit['_source'] || {})
row
end
def raise_if_failures(response)
shards = response['_shards']
puts(PP.singleline_pp [
'raise_if_failures',
response['hits']['hits'].length,
response['hits']['total'],
shards,
], "")
# Fail if any of the shards raised an error.
if shards && shards['failed'] != 0
# Delete the hits.
if response['hits'] && response['hits']['hits']
response['hits']['length'] = response['hits']['hits'].length
response['hits'].delete('hits')
end
raise ElasticSearchUserError.new(response)
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment