Skip to content

Instantly share code, notes, and snippets.

@gotar
Created August 13, 2012 12:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gotar/3340194 to your computer and use it in GitHub Desktop.
Save gotar/3340194 to your computer and use it in GitHub Desktop.
ParallelSearch
module API
class ParallelSearch
API_FETCHING_TIMEOUT = 1 # seconds
def initialize(config, search_apis)
@config = config
@search_apis = search_apis
end
# Returns results hash of {#ExternalSearchApi => [item, ...], ...}.
def call(keyword, limit)
search_all_in_parallel(keyword, limit).get_at_least_one_non_empty
end
private
# Runs all api searches in parallel via Array#pmap. Returns ResultsSet
# instance.
def search_all_in_parallel(keyword, limit)
ResultsSet.new(@search_apis.pmap {|api| [api, search_api(api, keyword, limit)]}.to_h)
end
# Returns Result instance. Notifies airbrake in case of errors.
def search_api(api, keyword, limit)
ReadyResult.new(find_with_cache(api, keyword, limit))
rescue Cache::TimedOut => e
FutureResult.new(e.future)
rescue => e
Airbrake.notify(e)
ReadyResult.new([])
end
# Returns an array of items or raises Cache::TimedOut exception.
def find_with_cache(api, keyword, limit)
cache(api, keyword, limit) do
api.find(keyword, limit)
end
end
def cache(api, keyword, limit, &block)
Cache.call(unique_api_search_key(api, keyword, limit),
:timeout => API_FETCHING_TIMEOUT,
&block)
end
def unique_api_search_key(api, keyword, limit)
["items_query", api.find_key(keyword, limit)].join(':')
end
class Result
# Only access #value if #ready? is true.
attr_reader :value
def ready?
true
end
end
class FutureResult < Result
# Initialize with an instance of Celluloid::Future that will return
# a Cache::Entry instance.
# Notifies airbrake in case of errors.
def initialize(future)
Celluloid::Future.new do
begin
@value = future.value.value
rescue => e
Airbrake.notify(e)
@value = []
end
end
end
def ready?
!@value.nil?
end
end
class ReadyResult < Result
# Initialize with an Array.
def initialize(value)
@value = value
end
end
class ResultsSet
def initialize(apis_with_results)
@apis_with_results = apis_with_results
end
# Returns Hash of {#ExternalSearchApi => [item, ...], ...} of the
# APIs that finished processing. Tries hard to return at least one
# element. Will return empty hash only when all results from the input
# finished and returned nothing.
def get_at_least_one_non_empty
while !at_least_one_non_empty? && !all_finished?
sleep(0.05)
end
results2values(ready_non_empty)
end
private
def at_least_one_non_empty?
!ready_non_empty.empty?
end
def all_finished?
@apis_with_results.all? {|api, result| result.ready? }
end
def ready_non_empty
@apis_with_results.select {|api, result| result.ready? && !result.value.empty?}
end
# Maps {#ExternalSearchApi => Result} hash to {#ExternalSearchApi => [item, ...]} hash.
def results2values(h)
h.map {|api, result| [api, result.value]}.to_h
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment