Last active
August 29, 2015 13:56
-
-
Save mattpodwysocki/8995535 to your computer and use it in GitHub Desktop.
Observable.scan for incremental reduce
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Applies an accumulator function over an observable sequence and returns each intermediate result. | |
# The optional seed value is used as the initial accumulator value. | |
# For aggregation behavior with no intermediate results, see Observable.reduce. | |
def scan(*args, &block) | |
has_seed = false | |
seed = nil | |
action = nil | |
# Argument parsing to support: | |
# 1. (seed, Symbol) | |
# 2. (seed, &block) | |
# 3. (Symbol) | |
# 4. (&block) | |
if args.length == 2 && args[1].is_a?(Symbol) | |
seed = args[0] | |
action = args[1].to_proc | |
has_seed = true | |
elsif args.length == 1 && block_given? | |
seed = args[0] | |
has_seed = true | |
action = block | |
elsif args.length == 1 && args[0].is_a?(Symbol) | |
action = args[0].to_proc | |
elsif args.length == 0 && block_given? | |
action = block | |
else | |
raise 'Invalid arguments' | |
end | |
AnonymousObservable.new do |observer| | |
has_accumulation = false | |
accumulation = nil | |
has_value = false | |
new_obs = Observer.configure do |o| | |
o.on_next do |x| | |
begin | |
has_value = true unless has_value | |
if has_accumulation | |
accumulation = action.call(accumulation, x) | |
else | |
accumulation = has_seed ? action.call(seed, x) : x | |
has_accumulation = true | |
end | |
rescue => err | |
observer.on_error err | |
return | |
end | |
observer.on_next accumulation | |
end | |
o.on_error &observer.method(:on_error) | |
o.on_completed do | |
observer.on_next seed if !has_value && has_seed | |
observer.on_completed | |
end | |
end | |
subscribe new_obs | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# With symbol | |
obs = RX::Observable.range(0, 5).scan(:+) | |
obs.subscribe_on_next &method(:puts) | |
# 0 | |
# 1 | |
# 3 | |
# 6 | |
# 10 | |
# With block | |
obs = RX::Observable.range(0, 5).scan {|acc, x| acc + x } | |
obs.subscribe_on_next &method(:puts) | |
# 0 | |
# 1 | |
# 3 | |
# 6 | |
# 10 | |
# With seed and symbol | |
obs = RX::Observable.range(1, 5).scan(1, :*) | |
obs.subscribe_on_next &method(:puts) | |
# 1 | |
# 2 | |
# 6 | |
# 24 | |
# 120 | |
# With seed and block | |
obs = RX::Observable.range(1, 5).scan(1) {|acc, x| acc * x} | |
obs.subscribe_on_next &method(:puts) | |
# 1 | |
# 2 | |
# 6 | |
# 24 | |
# 120 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment