Skip to content

Instantly share code, notes, and snippets.

@unktomi
Last active August 29, 2015 14:07
Show Gist options
  • Save unktomi/b46c9497f89460e6512e to your computer and use it in GitHub Desktop.
Save unktomi/b46c9497f89460e6512e to your computer and use it in GitHub Desktop.
package main.scala.test
import scala.collection.mutable
// Push Mode
// as in Rx
trait Observer[-A] {
def onNext(x: Option[A]): Unit
def contramap[B](f: B=>A): Observer[B] = {
val self = this
new Observer[B] {
def onNext(y: Option[B]): Unit = self.onNext(y.map(f))
}
}
}
trait Observable[A] {
def subscribe(x: Observer[A]): Unit => Unit
def map[B](f: A=>B): Observable[B] = {
new MappedObservable[A, B](this, f)
}
def flatMap[B](f: A=>Observable[B]): Observable[B] = {
new FlatMappedObservable[A, B](this, f)
}
var ob: Observed[A] = null
// conversion to pull mode
def observed: Observed[A] = {
if (ob == null) {
val x = new Mutable[A]
subscribe(x)
ob = x
}
ob
}
}
class MappedObservable[A, B](val self: Observable[A], val f: A=>B) extends Observable[B] {
override def subscribe(y: Observer[B]): Unit => Unit = {
val obs = new Observer[A] {
override def onNext(x: Option[A]): Unit = {
y.onNext(x.map(f))
}
}
self.subscribe(obs)
}
}
class FlatMappedObservable[A, B](val self: Observable[A], val f: A=>Observable[B]) extends Observable[B] {
override def subscribe(y: Observer[B]): Unit => Unit = {
self.subscribe(new Observer[A] {
var queue = new mutable.ListBuffer[Observable[B]]()
var sub: Unit => Unit = null
def performNext(ob: Observable[B]): Unit = {
if (sub != null) sub()
sub = ob.subscribe(new Observer[B] {
override def onNext(x: Option[B]): Unit = {
x match {
case Some(v) => y.onNext(x)
case None => {
if (queue.size > 0) {
performNext(queue.remove(0))
} else {
y.onNext(None)
}
}
}
}
})
}
override def onNext(x: Option[A]): Unit = {
x match {
case Some(v) => {
val ob = f(v)
if (sub == null) {
performNext(ob)
} else {
queue += ob
}
}
case None => {
if (queue.size > 0) {
performNext(queue.remove(0))
} else {
y.onNext(None)
}
}
}
}
})
}
}
// as in Rx
class Subject[A] extends Observer[A] with Observable[A] {
val subscribers: mutable.WeakHashMap[Observer[A], Unit] = new mutable.WeakHashMap[Observer[A], Unit]
override def onNext(x: Option[A]): Unit = {
for { y <- subscribers.keySet } {
y.onNext(x)
}
}
override def subscribe(x: Observer[A]): Unit => Unit = {
subscribers.put(x, ())
import Observed._
val xs = roots.get(x)
xs match {
case None => {
val s = new mutable.HashSet[Observable[_]]
roots.put(x, s)
s.add(this)
}
case Some(y) => y.add(this)
}
(nothing)=> {
unsubscribe(x)
}
}
def unsubscribe(x: Observer[A]): Unit = {
subscribers.remove(x)
import Observed._
val xs = roots.get(x)
xs match {
case None => {
}
case Some(y) => y.remove(this)
}
}
}
// Lazy pull mode
abstract class Observed[A] {
def apply(): A
var memo: Option[A] = None
var valid: Boolean = false
var listeners: mutable.WeakHashMap[Observed[_], Unit] = null
var subject: Subject[A] = null
private def addListener[B](x: Observed[B]): Observed[B] = {
if (listeners == null) listeners = new mutable.WeakHashMap[Observed[_], Unit]()
listeners.put(x, ())
x
}
def invalidate(): Unit = {
if (valid) {
valid = false
onInvalidate()
}
}
def onInvalidate(): Unit = {
invalidateDependents()
notifyObservers()
}
def changed: Option[A] = {
val prev: Option[A] = memo
val current: A = extract()
prev match {
case None => Some(current)
case Some(y) => if (y != current) Some(current) else None
}
}
def notifyObservers(): Unit = {
if (subject != null) {
changed match {
case None => ()
case Some(x) => subject.onNext(Some(x))
}
}
}
def invalidateDependents(): Unit = {
if (listeners != null) {
val nots = for { x <- listeners.keySet } yield ()=>x.invalidate()
for (j <- nots) j()
}
}
// conversion to push mode
def observable: Observable[A] = {
if (subject == null) subject = new Subject[A]
subject
}
// Functor
def map[B](f: A=>B): Observed[B] = {
val self = this
val r = new Observed[B] {
var lastInput: Option[A] = None
def apply(): B = {
val x = self()
val y = Some(x)
if (lastInput != y) {
lastInput = y
memo = Some(f(x))
}
memo.get
}
}
addListener(r)
}
// Monad (from Comonad)
def flatMap[B](f: A=>Observed[B]): Observed[B] = {
val self = this
val r = new Observed[B] {
def apply(): B = f(self.extract()).extract()
}
addListener(r)
}
// Comonad
def extract(): A = {
if (!valid) {
memo = Some(apply())
valid = true
}
memo.get
}
def coflatMap[B](f: Observed[A]=>B): Observed[B] = {
val self = this
val r = new Observed[B] {
def apply(): B = f(self)
}
addListener(r)
}
}
class Mutable[A] extends Observed[A] with Observer[A] {
def get: A = apply()
def set(x: A): Unit = onNext(Some(x))
override def changed: Option[A] = memo
private def update(x: Option[A]): Unit = {
memo = x
onInvalidate()
}
def onNext(x: Option[A]): Unit = {
valid = true
if (x != memo) update(x)
}
override def apply(): A = memo.get
}
object Observed {
val roots = new mutable.WeakHashMap[Observer[_], mutable.Set[Observable[_]]]()
def ref[A]: Mutable[A] = {
new Mutable[A]
}
def Println(prefix: String): Observer[Any] = new Observer[Any] {
def onNext(x: Option[Any]): Unit = println(prefix+x)
override def toString: String = "Println "+ prefix
}
def main(argv: Array[String]): Unit = {
def Printx = Println("push x = ")
def Print2x = Println("push: x + x = ")
def PrintOne = Println("push: 2x / 2x = ")
def Print2xMinusOne = Println("push: 2x - 1 = ")
// create a reactive Int
val x = ref[Int]
val ob = x.observable // convert pull to push
ob.subscribe(Printx)
val twox = for {
i <- x
j <- x
} yield i + j
val one = for (i <- twox) yield i / i
twox.observable.subscribe(Print2x)
one.observable.subscribe(PrintOne)
val twoxMinusOne = for { i <- twox.observable } yield i - 1
twoxMinusOne.subscribe(Print2xMinusOne)
println("x=100...")
val p = twoxMinusOne.observed // convert push to pull
x.set(100)
println("pull x + x = "+twox.extract())
println("pull one = "+one.extract())
println("pull 2x - 1 = "+p.extract())
println("x=99...")
x.set(99)
println("pull x + x = "+twox.extract())
println("pull one = "+one.extract())
println("pull 2x - 1 = "+p.extract())
x=100...
/*
push x = Some(100)
pull x + x = 200
pull one = 1
x=99...
push: x + x = Some(198)
push: 2x - 1 = Some(197)
push x = Some(99)
pull x + x = 198
pull one = 1
pull 2x - 1 = 197
*/
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment