Skip to content

Instantly share code, notes, and snippets.

@felipecsl felipecsl/rxruby.rb
Created Feb 25, 2016

Embed
What would you like to do?
Super simplified and naive (but fun) version of RxRuby I wrote during an endless flight to Brazil
class Observable
def initialize(&on_subscribe)
@on_subscribe = on_subscribe
end
def self.from(arr)
new do |o|
arr.each do |i|
o.on_next(i)
end
o.on_completed
end
end
def self.error(err)
new do |o|
o.on_error(err)
end
end
def self.just(i)
from([i])
end
def self.empty
from([])
end
def self.create(&on_subscribe)
new do |o|
on_subscribe.call(o)
end
end
def map(&block)
self.class.new do |o|
lift_action(o) do |i|
o.on_next(block.call(i))
end
end
end
def flat_map(&block)
self.class.new do |o|
inner_observer = ObserverWrapper.new(
on_next: Proc.new { |i| o.on_next(i) },
on_completed: -> {},
on_error: Proc.new {|e| raise e })
lift_action(o) do |i|
block.call(i).subscribe(inner_observer)
end
end
end
def subscribe(observer)
@on_subscribe.call(observer)
end
private
def lift_action(o, &on_next)
lift(
on_next: on_next,
on_completed: Proc.new { o.on_completed },
on_error: Proc.new { |e| o.on_error(e) }
)
end
def lift(operator)
begin
@on_subscribe.call(ObserverWrapper.new(operator))
rescue Exception => e
operator[:on_error].call(e)
end
end
end
class Observer
def on_next(data)
end
def on_completed
end
def on_error(err)
end
end
class ObserverWrapper < Observer
def initialize(inner)
@inner = inner
end
def on_next(data)
@inner[:on_next].call(data)
end
def on_completed
@inner[:on_completed].call
end
def on_error(err)
@inner[:on_error].call(err)
end
end
class PrintObserver < Observer
def on_next(data)
puts "GOT on_next: #{data}"
end
def on_completed
puts "GOT on_completed"
end
def on_error(err)
puts "GOT error: #{err}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.