Created
October 8, 2014 08:11
-
-
Save aldonline/865d2331fb02cdfdd31c to your computer and use it in GitHub Desktop.
scala-js version of radioactive
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package radioactivejs | |
import org.scalajs.dom | |
import _root_.util.StackVal | |
import org.scalajs.dom.{Event, HTMLElement} | |
import scala.collection.mutable | |
import scala.util.Try | |
import scala.scalajs.js | |
/* | |
I built this in an afternoon to help me rethink the API ( scala makes for a MUCH better modeling environment than JS ) | |
Don't use it. The real deal will be an API on top of the Javascript implementation. | |
I just keep it for reference. | |
*/ | |
object types { | |
type Handler = () => Unit | |
type Expr[T] = () => T | |
type Combinator[T] = Expr[T] => Expr[T] | |
type Stopper = () => Unit | |
type Callback[T] = Try[T] => Unit | |
} | |
import types._ | |
trait State { } | |
sealed trait NotifierState {} | |
sealed trait MonitorState {} | |
sealed trait LoopState {} | |
sealed trait IteratorState {} | |
class StatefulObject[ T <: State ]( current_state: T ) { | |
private var _state = current_state | |
def state: T = _state | |
protected def set_state( s: T ): Unit ={ | |
if ( _state != s ){ | |
_state = s | |
emit_change() | |
} | |
} | |
private val handlers = new mutable.HashSet[()=>Unit]() | |
def once_change( handler: () => Unit ){ } | |
def off_change( handler: () => Unit ){ } | |
def emit_change(): Unit ={ | |
handlers.foreach( _() ) | |
// remove handlers that triggered once | |
} | |
} | |
object util { | |
def delay( time: Double, f: () => Unit ): Int = dom.setTimeout( f, time ) | |
def setTimeout( f: () => Unit, time: Double ): Int = dom.setTimeout( f, time ) | |
def clearTimeout( id: Int ): Unit= dom.clearTimeout( id ) | |
def clearInterval( id: Int ){ dom.clearInterval( id ) } | |
def nextTick( f: () => Unit ){ delay( 1, f ) } | |
} | |
class Notifier( private [radioactivejs] val parent_monitor: Monitor ) | |
extends StatefulObject[NotifierState]( NotifierState.Active ) with ( () => Unit ) { | |
def a_sibling_notifier_has_fired(): Unit = set_state( NotifierState.IrrelevantToConsumer ) | |
def apply(): Unit = if ( state == NotifierState.Active ){ | |
parent_monitor.notifier_trigger() // call directly. don't use events | |
set_state( NotifierState.Fired ) | |
} | |
} | |
object NotifierState { | |
case object Active extends NotifierState | |
case object CancelledByPublisher extends NotifierState | |
case object IrrelevantToConsumer extends NotifierState | |
case object Fired extends NotifierState | |
} | |
private class Monitor extends StatefulObject[MonitorState]( MonitorState.Active ) { | |
private lazy val notifiers = new mutable.ArrayBuffer[Notifier] | |
def allocate_notifier(): Notifier = { | |
val n = new Notifier( this ) | |
notifiers += n | |
n | |
} | |
/** | |
* Called by one of our child notifiers | |
*/ | |
def notifier_trigger(): Unit = { | |
if ( fired ) return | |
fired = true | |
// turn off other notifiers | |
notifiers.foreach( _.a_sibling_notifier_has_fired() ) | |
emit("change") | |
} | |
/** | |
* creates a notifier and bubbles reactivity up the stack | |
*/ | |
def bubble(): Unit = ReactiveEval.notifier foreach { n => once("change", n ) } | |
def cancel(): Unit = { | |
} | |
} | |
object MonitorState { | |
case object Active extends MonitorState | |
case object Fired extends MonitorState | |
case object Irrelevant extends MonitorState | |
case object CancelledByConsumer extends MonitorState | |
} | |
private object Monitor { | |
def join( monitors: Traversable[Monitor] ): Unit = { | |
if ( monitors.isEmpty ) return; | |
ReactiveEval.notifier().foreach( n => { | |
var len = monitors.size | |
def cb = () => { len = len - 1 ; if ( len == 0) n() } | |
monitors.foreach(m => m.once( "change", cb )) | |
}) | |
} | |
} | |
private class MonitorCollector { | |
private val ms = new mutable.HashSet[Monitor] | |
def push( m: Monitor ){ ms += m } | |
def join(){ Monitor.join( ms ) } | |
def empty() = ms.isEmpty | |
def reset(){ ms.clear() } | |
} | |
// TODO: visibility? | |
class WaitSignal extends Exception {} | |
object WaitSignal { | |
def check( t: Try[_] ): Boolean = ( t.isFailure && t.failed.get.isInstanceOf[WaitSignal] ) | |
} | |
private class StopSignal extends Exception {} | |
private object StopSignal { | |
def check( t: Try[_] ): Boolean = ( t.isFailure && t.failed.get.isInstanceOf[StopSignal] ) | |
} | |
private class MonitorListenerProxy( handler: () => Unit ){ | |
private var _m: Option[Monitor] = None | |
def set( m: Option[Monitor] ): Unit = { | |
_m.map( _.off( "change", handler ) ) | |
_m = m | |
m.map( _.once( "change", handler ) ) | |
} | |
} | |
object IteratorState { | |
val VALID = 1 | |
val EXPIRED = 2 | |
} | |
private class Iterator[T]( expr: Expr[T]) extends EventEmitter { self => | |
private def emit_change(){ emit("change") } | |
private var last_token: Option[Token[T]] = None | |
private var _expired = true | |
def expired: Boolean = _expired | |
var iteration_count = 0 | |
private val monitor_listener = new MonitorListenerProxy(() => { | |
_expired = true | |
emit_change() | |
}) | |
def refresh(): Boolean = { | |
if ( _expired ){ | |
_expired = false | |
val t = expr2() | |
monitor_listener.set( t.monitor ) | |
last_token = Some( t ) | |
true | |
} else { | |
false | |
} | |
} | |
def current(): Try[T] = { | |
// first time we get it lazily | |
if ( last_token.isEmpty ) refresh() | |
// note that we return a null result if pending | |
if ( waiting() ) nullresult else last_token.get.result | |
} | |
private val nullresult = Try[T]( null.asInstanceOf[T] ) | |
def waiting(): Boolean = WaitSignal.check( last_token.get.result ) | |
def expireable(): Boolean = last_token match { | |
case None => true | |
case Some(t) => t.monitor.isDefined | |
} | |
def close(): Unit = { | |
last_token.map( _.monitor.map( _.cancel() )) | |
monitor_listener.set( None ) | |
service_caches.clear() | |
} | |
private lazy val expr2 = | |
addOurselvesToTheStack( | |
invalidateServiceCachesOnCompleteResults( | |
markPartialResults( | |
attachReactiveMonitors( | |
updateCounters( | |
tokenize( expr ) | |
) | |
) | |
) | |
) | |
) | |
private def tokenize( expr: Expr[T] ): Stream[T] = () => tap( new Token[T]() )( _.result = Try( expr() ) ) | |
private def attachReactiveMonitors[T]( stream: Stream[T] ): Stream[T] = () => { | |
val r = ReactiveEval.eval( stream ) | |
tap( r.result.get )( _.monitor = r.monitor ) | |
} | |
private def markPartialResults[T]( stream: Stream[T] ) = () => { | |
val prm = new PartialResultMarker | |
tap( prm.run( stream ) )( _.partial = prm.marked ) | |
} | |
private def invalidateServiceCachesOnCompleteResults[T]( stream: Stream[T] ) = () => | |
tap( stream() ){ t => if ( t.partial == false && ! WaitSignal.check( t.result ) ) service_caches.clear() } | |
private def updateCounters[T]( stream: Stream[T] ) = () => | |
tap( stream() ){ t => iteration_count = iteration_count + 1 } | |
private def tap[T]( v: T )( f: T => Unit ): T = { f(v) ; v } | |
private def addOurselvesToTheStack( stream: Stream[T] ) = ( ) => Iterator.stack.run( stream, () => this ) | |
class Token[T] { | |
var partial: Boolean = false | |
var monitor: Option[Monitor] = None | |
var result: Try[T] = null | |
} | |
type Stream[T] = () => Token[T] | |
lazy private val service_caches = new mutable.HashMap[ SyncifiedService[Any,Any], SyncifiedServiceCache[Any,Any] ] | |
} | |
object Iterator { | |
private val stack = new StackVal[Iterator[_]] | |
def service_caches = stack.get.service_caches | |
} | |
class LoopOpts( | |
val debounce: Int = 50, | |
val detached: Boolean = false | |
) | |
object LoopOpts { | |
val default = new LoopOpts( ) | |
} | |
private class Loop[T]( expr: Expr[T], opts: Option[LoopOpts] = None ) extends EventEmitter { self => | |
private def attachToStackCombinator( expr: Expr[T] ): Expr[T] = () => Loop.stack.run( expr, () => self ) | |
private lazy val options = opts.getOrElse( LoopOpts.default ) | |
private val expr2 = attachToStackCombinator( expr ) | |
private val iter = new Iterator( expr2 ) | |
private var loop_timeout: Option[Int] = None | |
private def request_loop(): Unit ={ | |
loop_timeout.foreach( util.clearTimeout _ ) | |
loop_timeout = Some( util.setTimeout( loop _ , options.debounce ) ) | |
} | |
private def loop(): Unit = | |
// every time we start looping we should check if we are still game or not | |
if ( checkForEOLHeuristics ){ | |
iter.refresh() | |
if ( StopSignal.check( iter.current() ) ) stop() else iter.once("change", request_loop _ ) | |
} else { | |
stop() | |
} | |
def iteration_count = iter.iteration_count | |
def stop() { | |
loop_timeout.foreach( util.clearTimeout _ ) | |
iter.close() | |
} | |
private def checkForEOLHeuristics(): Boolean = { | |
if ( parentLoop.isDefined ){ | |
val iterations_we_have_lived = parentLoop.get.iteration_count - parentIterationNumber.get | |
if ( iterations_we_have_lived > Loop.NUM_ITERATIONS_TO_SURVIVE ){ | |
return false | |
} | |
} | |
true | |
} | |
private var parentLoop: Option[Loop[_]] = None | |
private var parentIterationNumber: Option[Int] = None | |
private def attachToParent(): Unit ={ | |
if ( ! options.detached ) { | |
if ( Loop.stack.is_defined_? ) { | |
parentLoop = Some( Loop.stack.get ) | |
parentIterationNumber = Some( parentLoop.get.iteration_count ) | |
} | |
} | |
} | |
attachToParent() | |
request_loop() | |
} | |
private object Loop { | |
private val stack = new StackVal[Loop[_]] | |
private val NUM_ITERATIONS_TO_SURVIVE = 2 | |
} | |
trait CellConstructors { | |
def cell[T]() = new Cell[T] | |
def cell[T]( v:T ) = { | |
val c = new Cell[T] | |
c( v ) | |
c | |
} | |
def apply[T]( ): Cell[T] = cell[T]() | |
def apply( v: String ) = cell( v ) | |
def apply( v: Int ) = cell( v ) | |
def apply( v: Boolean ) = cell( v ) | |
def apply( v: Double ) = cell( v ) | |
} | |
object radioactive extends CellConstructors { | |
def active() = ReactiveEval.active() | |
def notifier() = ReactiveEval.notifier() | |
def wait_(): Unit = throw new WaitSignal | |
def stop(): Unit = throw new StopSignal | |
def fork(): Forker = new Forker | |
def loop[T]( expr: => T ):Unit = new Loop( () => expr ) | |
def loop[T]( opts: LoopOpts, expr: => T ):Unit = new Loop( () => expr, Some(opts) ) | |
def once[T]( expr: => T ):Unit = loop { expr ; stop() } | |
def waiting( expr: () => Any ): Boolean = | |
try { expr() ; false } | |
catch { case e: WaitSignal => { PartialResultMarker.mark() ; true } } | |
def syncify[I, O]( asyncFunction: ( I, Callback[O] ) => Unit, global: Boolean = false ): SyncifiedService[I, O] = new SyncifiedService( asyncFunction, global ) | |
def echo( delay: Double = 1000 ) = new EchoService( delay ) | |
def time( interval: Double = 100 ): Double = { | |
if ( interval > 0 ) notifier.foreach( util.delay( interval, _ ) ) ; | |
new js.Date().getTime | |
} | |
// HTML Utilities | |
// since this will be used | |
// mostly on the browser | |
def elm_loop( e: HTMLElement, f: () => Unit ): Unit = { | |
var stopper = false | |
val addh = ( e: Event ) => { | |
radioactive loop { | |
f() | |
if ( stopper ){ | |
radioactive.stop() | |
stopper = false | |
} | |
}; | |
false | |
} | |
val removeh = ( e: Event ) => { | |
stopper = true | |
} | |
e.addEventListener( "add", addh ) | |
e.addEventListener( "remove", removeh ) | |
} | |
} | |
class SyncifiedService[I, O] ( asyncFunction: ( I, Callback[O] ) => Unit, global: Boolean = false ) extends ( I => O ) { | |
private def cache = if ( global ) instance_scoped_cache else iteration_scoped_cache | |
// these only make sense in with global caches | |
def reset( ) = cache.reset( ) | |
def reset( filter: I => Boolean ): Unit = cache.reset( filter ) | |
override def apply( params: I ): O = cache.get( params ) | |
private def iteration_scoped_cache: SyncifiedServiceCache[I, O] = | |
Iterator.service_caches.getOrElseUpdate( | |
this.asInstanceOf[SyncifiedService[Any,Any]], | |
build_cache.asInstanceOf[SyncifiedServiceCache[Any,Any]] | |
).asInstanceOf[SyncifiedServiceCache[I,O]] | |
private lazy val instance_scoped_cache = build_cache | |
private def build_cache: SyncifiedServiceCache[I,O] = { | |
// create var first because we use a fwd reference below | |
var c: CellBasedCache[I, O] = null | |
c = new CellBasedCache[I, O]( ( params: I ) => asyncFunction( params, t => c.put( params, t) ) ) | |
c | |
} | |
} | |
trait SyncifiedServiceCache[K, V] { | |
def get( k: K ): V | |
def reset(): Unit | |
def reset( filter: K => Boolean ): Unit | |
} | |
package radioactivejs | |
import scala.util.Try | |
class Cell[T] extends ( () => T ) with ( T => Unit ) with ( ( Throwable, T ) => Unit ) { | |
private var value: Option[Try[T]] = None | |
private val notifiers = new NotifierPool | |
def has_notifiers_? : Boolean = notifiers.nonEmpty | |
def apply( ): T = { | |
notifiers.allocate() | |
doget() | |
} | |
private def doget( ):T = { | |
_deferred_eval.map( e => { | |
value = Some( Try( e() ) ) | |
_deferred_eval = None | |
}) | |
value match { | |
case None => (null).asInstanceOf[T] | |
case Some(t) => t.get | |
} | |
} | |
private def update( v: T ): Unit ={ | |
if ( value.isDefined && value.get == v ) return; | |
value = Some( Try(v) ) | |
notifiers.flush() | |
} | |
// will be evaluated next time the cell is accessed | |
// you can use this to set the cell to the value of a computation | |
private var _deferred_eval: Option[() => T] = None | |
def apply( v: () => T ): Unit = { | |
_deferred_eval = Some(v) | |
notifiers.flush() | |
} | |
// TODO: equality tests | |
def apply( v: T ): Unit = { | |
update(v) | |
} | |
def apply( e: Throwable ): Unit = { | |
value = Some( Try( throw e ) ) | |
notifiers.flush() | |
} | |
def apply( t: Try[T] ): Unit = if ( t.isFailure ) apply( t.failed.get ) else apply( t.get ) | |
def apply( e: Throwable, v: T ): Unit = if ( e == null ) apply( v ) else apply( e ) | |
} | |
trait CellOptions[T] { | |
def equals[T]( a:T, b:T ): Boolean | |
} | |
class CellBasedCache[K, V]( fetcher: (K) => Unit ) extends SyncifiedServiceCache[K, V] with (K => V) with ( (K, V) => Unit ) { | |
private lazy val cache = new mutable.HashMap[K,Cell[V]] | |
def apply( k: K ) = get( k ) | |
def apply( k: K, v: V ) = put( k, v ) | |
def get( k: K ): V = | |
(cache.get( k ) match { | |
case Some(c) => c | |
case None => { | |
val c = radioactive.cell[V]() | |
c( new WaitSignal ) | |
cache += ( k -> c ) // store the cell in the cache before calling the fetcher ( in case the fetch is sync ) | |
fetcher( k ) | |
c | |
} | |
})() | |
def put( k: K, v: V ) : Unit = getOrCreateCell( k )( v ) | |
def put( k: K, e: Throwable, v: V ) : Unit = getOrCreateCell( k )( e, v ) | |
def put( k: K, t: Try[V] ) : Unit = getOrCreateCell( k )( t ) | |
private def getOrCreateCell( k: K ): Cell[V] = | |
cache.get( k ) match { | |
case Some(c) => c | |
case None => { | |
val c = radioactive.cell[V]() | |
cache += ( k -> c ) | |
c | |
} | |
} | |
def reset(): Unit = reset( k => true ) | |
def reset( filter: K => Boolean ): Unit = | |
cache.toSeq | |
.filter( x => filter( x._1 ) ) | |
.foreach( { case ( k, c ) => if ( c.has_notifiers_? ) c( new WaitSignal ) else cache.remove( k ) } ) | |
} | |
class EchoService( delay: Double = 1000 ) extends Function1[String, String] { | |
private val cells = new mutable.HashMap[String, Cell[String]] | |
def apply( message: String ): String = { | |
cells.getOrElseUpdate( message, { | |
val c = radioactive.cell[String]( ) | |
c( new WaitSignal ) | |
util.delay( delay, () => c( message ) ) | |
c | |
})() | |
} | |
} | |
class Forker { | |
private var pres = 0 | |
private val monitors = new MonitorCollector | |
def apply[X]( expr: => X ): X = { | |
val res = ReactiveEval.eval( () => expr ) | |
if ( WaitSignal.check( res.result ) ){ | |
pres = pres + 1 | |
res.monitor.foreach( monitors.push _ ) | |
( null ).asInstanceOf[X] | |
} else { | |
res.unbox() | |
} | |
} | |
def join(): Unit = { | |
monitors.join() | |
if ( pres > 0 ) throw new WaitSignal | |
} | |
} | |
class NotifierPool { | |
private val notifiers = new mutable.ArrayBuffer[Notifier]() | |
def allocate(): Unit = { | |
radioactive.notifier().foreach( n => { | |
// TODO: listen to cancel / destroy events | |
notifiers += n | |
} ) | |
} | |
def flush(): Unit ={ | |
val ns = notifiers.toList | |
notifiers.clear() | |
ns.foreach( n => n() ) | |
} | |
def size = notifiers.size | |
def nonEmpty = notifiers.nonEmpty | |
// TODO: cancel/destroy | |
} | |
class PartialResultMarker { | |
private var flag = false | |
def run[T]( expr: Expr[T] ): T = PartialResultMarker.stackval.run( expr, () => this ) | |
def mark(): Unit = { flag = true } | |
def marked: Boolean = flag | |
} | |
object PartialResultMarker { | |
private lazy val stackval = new StackVal[PartialResultMarker]() | |
def mark() = stackval.get.mark() | |
def wrap[T]( expr: Expr[T] ): Expr[T] => PossiblyPartialResult[T] = | |
( expr: Expr[T] ) => { | |
val prm = new PartialResultMarker( ) | |
val res = prm.run( expr ) | |
new PossiblyPartialResult[T]( res, prm.marked ) | |
} | |
def run[T]( expr: Expr[T] ): PossiblyPartialResult[T] = { | |
val prm = new PartialResultMarker( ) | |
val res = prm.run( expr ) | |
new PossiblyPartialResult[T]( res, prm.marked ) | |
} | |
} | |
class PossiblyPartialResult[T]( val result: T, val partial: Boolean ) | |
private class ReactiveEval[T]( val expr: () => T ) { | |
private var _monitor: Option[Monitor] = None | |
private def monitor = _monitor | |
private lazy val lazy_monitor: Monitor = { | |
_monitor = Some( new Monitor(){} ) | |
_monitor.get | |
} | |
private def run( ): ReactiveEvalResult[T] = { | |
// evaluate expression first. it may create a monitor | |
// order is important ( mutable state sucks, I know ) | |
val t = Try( expr() ) | |
// and now compose result | |
return new ReactiveEvalResult[T]( t, monitor ) | |
} | |
private def allocate_notifier(): Notifier = lazy_monitor.allocate_notifier() | |
} | |
private object ReactiveEval { | |
private lazy val stack = new mutable.ArrayStack[ReactiveEval[_]]() | |
def notifier(): Option[Notifier] = stack.lastOption.map( _.allocate_notifier ) | |
def active(): Boolean = stack.nonEmpty | |
def eval[T]( expr: () => T ): ReactiveEvalResult[T] = { | |
val rev = new ReactiveEval[T]( expr ) | |
stack.push(rev) | |
val r = rev.run() | |
stack.pop() | |
r | |
} | |
} | |
private class ReactiveEvalResult[T]( val result: Try[T], val monitor: Option[Monitor] ) { | |
def unbox() = { | |
monitor.map( _.bubble() ) | |
result.get | |
} | |
} | |
object sandbox { | |
def test(): Unit ={ | |
def fetchUserName(id:Int): String = ??? | |
// getting all users from an async service | |
def getUserNames( ids: Seq[Int], cb: ( Seq[String] ) => Unit ): Unit ={ | |
radioactive loop { | |
val fork = radioactive.fork() | |
val names = ( 1 to 10 ).map( id => fork( fetchUserName( id ) ) ) | |
fork.join() | |
cb( names ) | |
radioactive.stop() | |
} | |
} | |
radioactive loop { | |
val fork = radioactive.fork( ) | |
fork { 8 } | |
fork { 77 } | |
fork.join( ) | |
radioactive.stop( ) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment