Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created February 17, 2014 19:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/9057464 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/9057464 to your computer and use it in GitHub Desktop.
# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
require 'monitor'
require 'rx/subscriptions/subscription'
require 'rx/subscriptions/composite_subscription'
require 'rx/subscriptions/ref_count_subscription'
require 'rx/subscriptions/single_assignment_subscription'
require 'rx/core/observer'
require 'rx/core/observable'
module RX
module Observable
# Internals for
AMB_STATE_LEFT = 0
AMB_STATE_RIGHT = 1
AMB_STATE_NEITHER = 2
class AmbObserver
attr_accessor(:observer)
def method_missing(m, *args, &block)
@observer.method(m).call(*args)
end
end
# Propagates the observable sequence that reacts first.
def amb(second)
AnonymousObservable.new do |observer|
left_subscription = SingleAssignmentSubscription.new
right_subscription = SingleAssignmentSubscription.new
choice = AMB_STATE_NEITHER
gate = Monitor.new
left = AmbObserver.new
right = AmbObserver.new
handle_left = lambda do |&action|
if choice == AMB_STATE_NEITHER
choice = AMB_STATE_LEFT
right_subscription.unsubscribe
left.observer = observer
end
action.call if choice == AMB_STATE_LEFT
end
handle_right = lambda do |&action|
if choice == AMB_STATE_NEITHER
choice = AMB_STATE_RIGHT
left_subscription.unsubscribe
right.observer = observer
end
action.call if choice == AMB_STATE_RIGHT
end
left_obs = Observer.configure do |o|
o.on_next {|x| handle_left.call { observer.on_next x } }
o.on_error {|err| handle_left.call { observer.on_error err } }
o.on_completed { handle_left.call { observer.on_completed } }
end
right_obs = Observer.configure do |o|
o.on_next {|x| handle_right.call { observer.on_next x } }
o.on_error {|err| handle_right.call { observer.on_error err } }
o.on_completed { handle_right.call { observer.on_completed } }
end
left.observer = Observer.allow_reentrancy(left_obs, gate)
right.observer = Observer.allow_reentrancy(right_obs, gate)
left_subscription.subscription = self.subscribe left
right_subscription.subscription = second.subscribe right
CompositeSubscription.new [left_subscription, right_subscription]
end
end
class << self
# Propagates the observable sequence that reacts first.
def amb(*args)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment