Skip to content

Instantly share code, notes, and snippets.

@lancewalton
Created April 4, 2014 21:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lancewalton/9983866 to your computer and use it in GitHub Desktop.
Save lancewalton/9983866 to your computer and use it in GitHub Desktop.
Example of RX with a feedback loop
import scala.collection.mutable
import rx.lang.scala.{Observable, Subscriber, Subscription}
// This is here to let us construct an Observable that we can 'tell' of new events.
case class SimpleObservable[T](initial: Option[T] = None) {
private val subscribers: mutable.HashSet[Subscriber[T]] = new mutable.HashSet[Subscriber[T]] with mutable.SynchronizedSet[Subscriber[T]]
private var lastValue: Option[T] = initial
val observable = Observable { (subscriber: Subscriber[T]) =>
subscribe(subscriber)
lastValue.foreach(subscriber.onNext)
subscriber.add(Subscription(unsubscribe(subscriber)))
}
def tell(t: T): Boolean = {
if (Option(t) != lastValue) {
lastValue = Some(t)
subscribers.foreach(_.onNext(t))
true
} else false
}
private def subscribe(s: Subscriber[T]): Unit = subscribers += s
private def unsubscribe(s: Subscriber[T]): Unit = subscribers -= s
}
object Observables {
case class Mapper[T](result: Observable[T], mapAndTell: (T => T) => Boolean)
// This constructs a Mapper whose 'result' is an Observable and provides a function which we can use to
// map the current observables value, resulting in a new value for that observable
// e.g.
// val m = mapper[List[Int](Nil)
// m.mapAndTell(l => 1:: l)
// m.mapAndTell(l => 2:: l)
// would result in m.result initially having the value Nil, then 1 :: Nil, then 2 :: 1 :: Nil
// Note that it uses feedback to achieve this: the observable we give out is the 'accumulator'.observable
// When we use mapAndTell, the current accumulator.observable value is mapped with the given function and
// then we 'tell' the accumulator what it's new value is.
def mapper[T](initial: T) = {
val accumulator = SimpleObservable[T](Some(initial))
val mapper = SimpleObservable[Function1[T, T]]()
val combiner = accumulator.observable combineLatest mapper.observable map { case (acc, map) ⇒ map(acc) } distinctUntilChanged
val combinerSubscription = combiner.subscribe { accumulator.tell(_) }
Mapper(accumulator.observable, mapper.tell _)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment