Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/8995535 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/8995535 to your computer and use it in GitHub Desktop.
Observable.scan for incremental reduce
# Applies an accumulator function over an observable sequence and returns each intermediate result.
# The optional seed value is used as the initial accumulator value.
# For aggregation behavior with no intermediate results, see Observable.reduce.
def scan(*args, &block)
has_seed = false
seed = nil
action = nil
# Argument parsing to support:
# 1. (seed, Symbol)
# 2. (seed, &block)
# 3. (Symbol)
# 4. (&block)
if args.length == 2 && args[1].is_a?(Symbol)
seed = args[0]
action = args[1].to_proc
has_seed = true
elsif args.length == 1 && block_given?
seed = args[0]
has_seed = true
action = block
elsif args.length == 1 && args[0].is_a?(Symbol)
action = args[0].to_proc
elsif args.length == 0 && block_given?
action = block
else
raise 'Invalid arguments'
end
AnonymousObservable.new do |observer|
has_accumulation = false
accumulation = nil
has_value = false
new_obs = Observer.configure do |o|
o.on_next do |x|
begin
has_value = true unless has_value
if has_accumulation
accumulation = action.call(accumulation, x)
else
accumulation = has_seed ? action.call(seed, x) : x
has_accumulation = true
end
rescue => err
observer.on_error err
return
end
observer.on_next accumulation
end
o.on_error &observer.method(:on_error)
o.on_completed do
observer.on_next seed if !has_value && has_seed
observer.on_completed
end
end
subscribe new_obs
end
end
# With symbol
obs = RX::Observable.range(0, 5).scan(:+)
obs.subscribe_on_next &method(:puts)
# 0
# 1
# 3
# 6
# 10
# With block
obs = RX::Observable.range(0, 5).scan {|acc, x| acc + x }
obs.subscribe_on_next &method(:puts)
# 0
# 1
# 3
# 6
# 10
# With seed and symbol
obs = RX::Observable.range(1, 5).scan(1, :*)
obs.subscribe_on_next &method(:puts)
# 1
# 2
# 6
# 24
# 120
# With seed and block
obs = RX::Observable.range(1, 5).scan(1) {|acc, x| acc * x}
obs.subscribe_on_next &method(:puts)
# 1
# 2
# 6
# 24
# 120
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment