Created
December 13, 2011 00:24
-
-
Save erickt/1469802 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ElasticSearchUserError < StandardError | |
def initialize(failures) | |
@failures = failures | |
end | |
def to_s | |
"Failures:\n#{PP.pp @failures, ""}" | |
end | |
end | |
module CouchRest | |
class ExtendedDocument | |
class << self | |
def elasticsearch_type_name | |
self.to_s.downcase | |
end | |
def by_query_scroll(query, scroll, params={}, &block) | |
raw = params.delete(:raw) | |
scroll_id = params.delete(:scroll_id) | |
first = true | |
rows = [] | |
total = 0 | |
begin | |
if first | |
first = false | |
response = do_query(query, params.merge(:raw => true, | |
:scroll => scroll)) | |
else | |
response = app { elasticsearch }.execute(:scroll, scroll_id, | |
:scroll => scroll) | |
raise_if_failures response | |
end | |
old_scroll_id = scroll_id | |
scroll_id = response['_scroll_id'] | |
hits = response['hits']['hits'] | |
total += hits.length | |
unless hits.empty? | |
if block_given? | |
if raw | |
yield response | |
else | |
hits.each { |hit| yield prepare_row(hit) } | |
end | |
else | |
if raw | |
rows << response | |
else | |
rows.concat hits.map { |hit| prepare_row hit } | |
end | |
end | |
end | |
end until hits.empty? | |
if total != response['hits']['total'] | |
response['hits'].delete('hits') | |
pp response | |
raise ElasticSearchUserError, | |
"Only #{total} of #{response['hits']['total']} docs returned!" | |
end | |
rows unless block_given? | |
end | |
def by_query_scan(query, scroll, params={}, &block) | |
by_query_scroll(query, scroll, params.merge(:search_type => 'scan'), | |
&block) | |
end | |
def by_query(query, params={}, &block) | |
# Support scolling | |
if params[:search_type] == "scan" | |
scroll = params.delete(:scroll) | |
by_query_scan(query, scroll, params, &block) | |
elsif params[:scroll] | |
scroll = params.delete(:scroll) | |
by_query_scroll(query, scroll, params, &block) | |
else | |
do_query(query, params, &block) | |
end | |
end | |
def by_query_string(query_string, params={}, &block) | |
if query_string.empty? | |
query = {} | |
else | |
query = { | |
:query => { | |
:query_string => { | |
:default_field => "_all", | |
:analyzer => "keyword", | |
:query => query_string } | |
}} | |
end | |
by_query(query, params, &block) | |
end | |
def by_multiget(query, params={}, &block) | |
# Query elasticsearch for the results. I'm not using rubberband's | |
# helper functions as I just want the raw json. | |
response = app { elasticsearch }.execute(:multi_get, "cmdb_#{RACK_ENV}", | |
elasticsearch_type_name, query, | |
params) | |
# Fail if any of the shards raised an error. | |
if response['_shards'] && response['_shards']['failed'] != 0 | |
raise ElasticSearchUserError.new(response['_shards']) | |
end | |
if block_given? | |
response.each { |hit| yield hit } | |
else | |
response | |
end | |
end | |
private | |
# Look up in ElasticSearch for the documents. | |
def do_query(query, params={}, &block) | |
# Raw is used to decide whether or not to include the elasticsearch | |
# metadata. | |
raw = params.delete(:raw) | |
# Query elasticsearch for the results. I'm not using rubberband's | |
# helper functions as I just want the raw json. | |
response = app { elasticsearch }.execute(:search, "cmdb_#{RACK_ENV}", | |
elasticsearch_type_name, query, | |
params) | |
raise_if_failures response | |
if block_given? | |
if raw | |
yield response | |
else | |
response['hits']['hits'].each { |hit| yield prepare_row(hit) } | |
end | |
else | |
if raw | |
response | |
else | |
rows = response['hits']['hits'].map { |hit| prepare_row hit } | |
{ "total" => response['hits']['total'], "rows" => rows } | |
end | |
end | |
end | |
def prepare_row(hit) | |
row = {} | |
row.merge!(hit['fields'] || hit['_source'] || {}) | |
row | |
end | |
def raise_if_failures(response) | |
shards = response['_shards'] | |
puts(PP.singleline_pp [ | |
'raise_if_failures', | |
response['hits']['hits'].length, | |
response['hits']['total'], | |
shards, | |
], "") | |
# Fail if any of the shards raised an error. | |
if shards && shards['failed'] != 0 | |
# Delete the hits. | |
if response['hits'] && response['hits']['hits'] | |
response['hits']['length'] = response['hits']['hits'].length | |
response['hits'].delete('hits') | |
end | |
raise ElasticSearchUserError.new(response) | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment