Created
February 25, 2016 13:30
-
-
Save felipecsl/b4d0a36f7a6056f270d6 to your computer and use it in GitHub Desktop.
Super simplified and naive (but fun) version of RxRuby I wrote during an endless flight to Brazil
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Observable | |
def initialize(&on_subscribe) | |
@on_subscribe = on_subscribe | |
end | |
def self.from(arr) | |
new do |o| | |
arr.each do |i| | |
o.on_next(i) | |
end | |
o.on_completed | |
end | |
end | |
def self.error(err) | |
new do |o| | |
o.on_error(err) | |
end | |
end | |
def self.just(i) | |
from([i]) | |
end | |
def self.empty | |
from([]) | |
end | |
def self.create(&on_subscribe) | |
new do |o| | |
on_subscribe.call(o) | |
end | |
end | |
def map(&block) | |
self.class.new do |o| | |
lift_action(o) do |i| | |
o.on_next(block.call(i)) | |
end | |
end | |
end | |
def flat_map(&block) | |
self.class.new do |o| | |
inner_observer = ObserverWrapper.new( | |
on_next: Proc.new { |i| o.on_next(i) }, | |
on_completed: -> {}, | |
on_error: Proc.new {|e| raise e }) | |
lift_action(o) do |i| | |
block.call(i).subscribe(inner_observer) | |
end | |
end | |
end | |
def subscribe(observer) | |
@on_subscribe.call(observer) | |
end | |
private | |
def lift_action(o, &on_next) | |
lift( | |
on_next: on_next, | |
on_completed: Proc.new { o.on_completed }, | |
on_error: Proc.new { |e| o.on_error(e) } | |
) | |
end | |
def lift(operator) | |
begin | |
@on_subscribe.call(ObserverWrapper.new(operator)) | |
rescue Exception => e | |
operator[:on_error].call(e) | |
end | |
end | |
end | |
class Observer | |
def on_next(data) | |
end | |
def on_completed | |
end | |
def on_error(err) | |
end | |
end | |
class ObserverWrapper < Observer | |
def initialize(inner) | |
@inner = inner | |
end | |
def on_next(data) | |
@inner[:on_next].call(data) | |
end | |
def on_completed | |
@inner[:on_completed].call | |
end | |
def on_error(err) | |
@inner[:on_error].call(err) | |
end | |
end | |
class PrintObserver < Observer | |
def on_next(data) | |
puts "GOT on_next: #{data}" | |
end | |
def on_completed | |
puts "GOT on_completed" | |
end | |
def on_error(err) | |
puts "GOT error: #{err}" | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment