Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created February 19, 2014 20:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/9101290 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/9101290 to your computer and use it in GitHub Desktop.
# Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences produces an element.
def combine_latest(*args, &result_selector)
AnonymousObservable.new do |observer|
n = args.length
has_value = Array.new(n, false)
has_value_all = false
values = Array.new(n)
is_done = Array.new(n, false)
next_item = lambda do |i|
has_value[i] = true
if has_value_all || (has_value_all = has_value.all?)
res = nil
begin
res = result_selector.call(values)
rescue => e
observer.on_error e
return
end
observer.on_next(res)
elsif enumerable_select_with_index(is_done) {|x, j| j != i} .all?
observer.on_completed
return
end
end
done = lambda do |i|
is_done[i] = true
observer.on_completed if is_done.all?
end
gate = Monitor.new
subscriptions = Array.new(n) do |i|
sas = SingleAssignmentSubscription.new
sas_obs = Observer.configure do |o|
o.on_next do |x|
values[i] = x
next_item.call i
end
o.on_error &observer.method(:on_error)
o.on_completed { done.call i }
end
sas.subscription = args[i].synchronize(gate).subscribe(sas_obs)
subscriptions[i] = sas
end
CompositeSubscription.new subscriptions
end
end
end
private
def enumerable_select_with_index(arr, &block)
[].tap do |new_arr|
arr.each_with_index do |item, index|
new_arr.push item if block.call item, index
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment