Skip to content

Instantly share code, notes, and snippets.

@aldonline
Created October 8, 2014 08:11
Show Gist options
  • Save aldonline/865d2331fb02cdfdd31c to your computer and use it in GitHub Desktop.
Save aldonline/865d2331fb02cdfdd31c to your computer and use it in GitHub Desktop.
scala-js version of radioactive
package radioactivejs
import org.scalajs.dom
import _root_.util.StackVal
import org.scalajs.dom.{Event, HTMLElement}
import scala.collection.mutable
import scala.util.Try
import scala.scalajs.js
/*
I built this in an afternoon to help me rethink the API ( scala makes for a MUCH better modeling environment than JS )
Don't use it. The real deal will be an API on top of the Javascript implementation.
I just keep it for reference.
*/
object types {
type Handler = () => Unit
type Expr[T] = () => T
type Combinator[T] = Expr[T] => Expr[T]
type Stopper = () => Unit
type Callback[T] = Try[T] => Unit
}
import types._
trait State { }
sealed trait NotifierState {}
sealed trait MonitorState {}
sealed trait LoopState {}
sealed trait IteratorState {}
class StatefulObject[ T <: State ]( current_state: T ) {
private var _state = current_state
def state: T = _state
protected def set_state( s: T ): Unit ={
if ( _state != s ){
_state = s
emit_change()
}
}
private val handlers = new mutable.HashSet[()=>Unit]()
def once_change( handler: () => Unit ){ }
def off_change( handler: () => Unit ){ }
def emit_change(): Unit ={
handlers.foreach( _() )
// remove handlers that triggered once
}
}
object util {
def delay( time: Double, f: () => Unit ): Int = dom.setTimeout( f, time )
def setTimeout( f: () => Unit, time: Double ): Int = dom.setTimeout( f, time )
def clearTimeout( id: Int ): Unit= dom.clearTimeout( id )
def clearInterval( id: Int ){ dom.clearInterval( id ) }
def nextTick( f: () => Unit ){ delay( 1, f ) }
}
class Notifier( private [radioactivejs] val parent_monitor: Monitor )
extends StatefulObject[NotifierState]( NotifierState.Active ) with ( () => Unit ) {
def a_sibling_notifier_has_fired(): Unit = set_state( NotifierState.IrrelevantToConsumer )
def apply(): Unit = if ( state == NotifierState.Active ){
parent_monitor.notifier_trigger() // call directly. don't use events
set_state( NotifierState.Fired )
}
}
object NotifierState {
case object Active extends NotifierState
case object CancelledByPublisher extends NotifierState
case object IrrelevantToConsumer extends NotifierState
case object Fired extends NotifierState
}
private class Monitor extends StatefulObject[MonitorState]( MonitorState.Active ) {
private lazy val notifiers = new mutable.ArrayBuffer[Notifier]
def allocate_notifier(): Notifier = {
val n = new Notifier( this )
notifiers += n
n
}
/**
* Called by one of our child notifiers
*/
def notifier_trigger(): Unit = {
if ( fired ) return
fired = true
// turn off other notifiers
notifiers.foreach( _.a_sibling_notifier_has_fired() )
emit("change")
}
/**
* creates a notifier and bubbles reactivity up the stack
*/
def bubble(): Unit = ReactiveEval.notifier foreach { n => once("change", n ) }
def cancel(): Unit = {
}
}
object MonitorState {
case object Active extends MonitorState
case object Fired extends MonitorState
case object Irrelevant extends MonitorState
case object CancelledByConsumer extends MonitorState
}
private object Monitor {
def join( monitors: Traversable[Monitor] ): Unit = {
if ( monitors.isEmpty ) return;
ReactiveEval.notifier().foreach( n => {
var len = monitors.size
def cb = () => { len = len - 1 ; if ( len == 0) n() }
monitors.foreach(m => m.once( "change", cb ))
})
}
}
private class MonitorCollector {
private val ms = new mutable.HashSet[Monitor]
def push( m: Monitor ){ ms += m }
def join(){ Monitor.join( ms ) }
def empty() = ms.isEmpty
def reset(){ ms.clear() }
}
// TODO: visibility?
class WaitSignal extends Exception {}
object WaitSignal {
def check( t: Try[_] ): Boolean = ( t.isFailure && t.failed.get.isInstanceOf[WaitSignal] )
}
private class StopSignal extends Exception {}
private object StopSignal {
def check( t: Try[_] ): Boolean = ( t.isFailure && t.failed.get.isInstanceOf[StopSignal] )
}
private class MonitorListenerProxy( handler: () => Unit ){
private var _m: Option[Monitor] = None
def set( m: Option[Monitor] ): Unit = {
_m.map( _.off( "change", handler ) )
_m = m
m.map( _.once( "change", handler ) )
}
}
object IteratorState {
val VALID = 1
val EXPIRED = 2
}
private class Iterator[T]( expr: Expr[T]) extends EventEmitter { self =>
private def emit_change(){ emit("change") }
private var last_token: Option[Token[T]] = None
private var _expired = true
def expired: Boolean = _expired
var iteration_count = 0
private val monitor_listener = new MonitorListenerProxy(() => {
_expired = true
emit_change()
})
def refresh(): Boolean = {
if ( _expired ){
_expired = false
val t = expr2()
monitor_listener.set( t.monitor )
last_token = Some( t )
true
} else {
false
}
}
def current(): Try[T] = {
// first time we get it lazily
if ( last_token.isEmpty ) refresh()
// note that we return a null result if pending
if ( waiting() ) nullresult else last_token.get.result
}
private val nullresult = Try[T]( null.asInstanceOf[T] )
def waiting(): Boolean = WaitSignal.check( last_token.get.result )
def expireable(): Boolean = last_token match {
case None => true
case Some(t) => t.monitor.isDefined
}
def close(): Unit = {
last_token.map( _.monitor.map( _.cancel() ))
monitor_listener.set( None )
service_caches.clear()
}
private lazy val expr2 =
addOurselvesToTheStack(
invalidateServiceCachesOnCompleteResults(
markPartialResults(
attachReactiveMonitors(
updateCounters(
tokenize( expr )
)
)
)
)
)
private def tokenize( expr: Expr[T] ): Stream[T] = () => tap( new Token[T]() )( _.result = Try( expr() ) )
private def attachReactiveMonitors[T]( stream: Stream[T] ): Stream[T] = () => {
val r = ReactiveEval.eval( stream )
tap( r.result.get )( _.monitor = r.monitor )
}
private def markPartialResults[T]( stream: Stream[T] ) = () => {
val prm = new PartialResultMarker
tap( prm.run( stream ) )( _.partial = prm.marked )
}
private def invalidateServiceCachesOnCompleteResults[T]( stream: Stream[T] ) = () =>
tap( stream() ){ t => if ( t.partial == false && ! WaitSignal.check( t.result ) ) service_caches.clear() }
private def updateCounters[T]( stream: Stream[T] ) = () =>
tap( stream() ){ t => iteration_count = iteration_count + 1 }
private def tap[T]( v: T )( f: T => Unit ): T = { f(v) ; v }
private def addOurselvesToTheStack( stream: Stream[T] ) = ( ) => Iterator.stack.run( stream, () => this )
class Token[T] {
var partial: Boolean = false
var monitor: Option[Monitor] = None
var result: Try[T] = null
}
type Stream[T] = () => Token[T]
lazy private val service_caches = new mutable.HashMap[ SyncifiedService[Any,Any], SyncifiedServiceCache[Any,Any] ]
}
object Iterator {
private val stack = new StackVal[Iterator[_]]
def service_caches = stack.get.service_caches
}
class LoopOpts(
val debounce: Int = 50,
val detached: Boolean = false
)
object LoopOpts {
val default = new LoopOpts( )
}
private class Loop[T]( expr: Expr[T], opts: Option[LoopOpts] = None ) extends EventEmitter { self =>
private def attachToStackCombinator( expr: Expr[T] ): Expr[T] = () => Loop.stack.run( expr, () => self )
private lazy val options = opts.getOrElse( LoopOpts.default )
private val expr2 = attachToStackCombinator( expr )
private val iter = new Iterator( expr2 )
private var loop_timeout: Option[Int] = None
private def request_loop(): Unit ={
loop_timeout.foreach( util.clearTimeout _ )
loop_timeout = Some( util.setTimeout( loop _ , options.debounce ) )
}
private def loop(): Unit =
// every time we start looping we should check if we are still game or not
if ( checkForEOLHeuristics ){
iter.refresh()
if ( StopSignal.check( iter.current() ) ) stop() else iter.once("change", request_loop _ )
} else {
stop()
}
def iteration_count = iter.iteration_count
def stop() {
loop_timeout.foreach( util.clearTimeout _ )
iter.close()
}
private def checkForEOLHeuristics(): Boolean = {
if ( parentLoop.isDefined ){
val iterations_we_have_lived = parentLoop.get.iteration_count - parentIterationNumber.get
if ( iterations_we_have_lived > Loop.NUM_ITERATIONS_TO_SURVIVE ){
return false
}
}
true
}
private var parentLoop: Option[Loop[_]] = None
private var parentIterationNumber: Option[Int] = None
private def attachToParent(): Unit ={
if ( ! options.detached ) {
if ( Loop.stack.is_defined_? ) {
parentLoop = Some( Loop.stack.get )
parentIterationNumber = Some( parentLoop.get.iteration_count )
}
}
}
attachToParent()
request_loop()
}
private object Loop {
private val stack = new StackVal[Loop[_]]
private val NUM_ITERATIONS_TO_SURVIVE = 2
}
trait CellConstructors {
def cell[T]() = new Cell[T]
def cell[T]( v:T ) = {
val c = new Cell[T]
c( v )
c
}
def apply[T]( ): Cell[T] = cell[T]()
def apply( v: String ) = cell( v )
def apply( v: Int ) = cell( v )
def apply( v: Boolean ) = cell( v )
def apply( v: Double ) = cell( v )
}
object radioactive extends CellConstructors {
def active() = ReactiveEval.active()
def notifier() = ReactiveEval.notifier()
def wait_(): Unit = throw new WaitSignal
def stop(): Unit = throw new StopSignal
def fork(): Forker = new Forker
def loop[T]( expr: => T ):Unit = new Loop( () => expr )
def loop[T]( opts: LoopOpts, expr: => T ):Unit = new Loop( () => expr, Some(opts) )
def once[T]( expr: => T ):Unit = loop { expr ; stop() }
def waiting( expr: () => Any ): Boolean =
try { expr() ; false }
catch { case e: WaitSignal => { PartialResultMarker.mark() ; true } }
def syncify[I, O]( asyncFunction: ( I, Callback[O] ) => Unit, global: Boolean = false ): SyncifiedService[I, O] = new SyncifiedService( asyncFunction, global )
def echo( delay: Double = 1000 ) = new EchoService( delay )
def time( interval: Double = 100 ): Double = {
if ( interval > 0 ) notifier.foreach( util.delay( interval, _ ) ) ;
new js.Date().getTime
}
// HTML Utilities
// since this will be used
// mostly on the browser
def elm_loop( e: HTMLElement, f: () => Unit ): Unit = {
var stopper = false
val addh = ( e: Event ) => {
radioactive loop {
f()
if ( stopper ){
radioactive.stop()
stopper = false
}
};
false
}
val removeh = ( e: Event ) => {
stopper = true
}
e.addEventListener( "add", addh )
e.addEventListener( "remove", removeh )
}
}
class SyncifiedService[I, O] ( asyncFunction: ( I, Callback[O] ) => Unit, global: Boolean = false ) extends ( I => O ) {
private def cache = if ( global ) instance_scoped_cache else iteration_scoped_cache
// these only make sense in with global caches
def reset( ) = cache.reset( )
def reset( filter: I => Boolean ): Unit = cache.reset( filter )
override def apply( params: I ): O = cache.get( params )
private def iteration_scoped_cache: SyncifiedServiceCache[I, O] =
Iterator.service_caches.getOrElseUpdate(
this.asInstanceOf[SyncifiedService[Any,Any]],
build_cache.asInstanceOf[SyncifiedServiceCache[Any,Any]]
).asInstanceOf[SyncifiedServiceCache[I,O]]
private lazy val instance_scoped_cache = build_cache
private def build_cache: SyncifiedServiceCache[I,O] = {
// create var first because we use a fwd reference below
var c: CellBasedCache[I, O] = null
c = new CellBasedCache[I, O]( ( params: I ) => asyncFunction( params, t => c.put( params, t) ) )
c
}
}
trait SyncifiedServiceCache[K, V] {
def get( k: K ): V
def reset(): Unit
def reset( filter: K => Boolean ): Unit
}
package radioactivejs
import scala.util.Try
class Cell[T] extends ( () => T ) with ( T => Unit ) with ( ( Throwable, T ) => Unit ) {
private var value: Option[Try[T]] = None
private val notifiers = new NotifierPool
def has_notifiers_? : Boolean = notifiers.nonEmpty
def apply( ): T = {
notifiers.allocate()
doget()
}
private def doget( ):T = {
_deferred_eval.map( e => {
value = Some( Try( e() ) )
_deferred_eval = None
})
value match {
case None => (null).asInstanceOf[T]
case Some(t) => t.get
}
}
private def update( v: T ): Unit ={
if ( value.isDefined && value.get == v ) return;
value = Some( Try(v) )
notifiers.flush()
}
// will be evaluated next time the cell is accessed
// you can use this to set the cell to the value of a computation
private var _deferred_eval: Option[() => T] = None
def apply( v: () => T ): Unit = {
_deferred_eval = Some(v)
notifiers.flush()
}
// TODO: equality tests
def apply( v: T ): Unit = {
update(v)
}
def apply( e: Throwable ): Unit = {
value = Some( Try( throw e ) )
notifiers.flush()
}
def apply( t: Try[T] ): Unit = if ( t.isFailure ) apply( t.failed.get ) else apply( t.get )
def apply( e: Throwable, v: T ): Unit = if ( e == null ) apply( v ) else apply( e )
}
trait CellOptions[T] {
def equals[T]( a:T, b:T ): Boolean
}
class CellBasedCache[K, V]( fetcher: (K) => Unit ) extends SyncifiedServiceCache[K, V] with (K => V) with ( (K, V) => Unit ) {
private lazy val cache = new mutable.HashMap[K,Cell[V]]
def apply( k: K ) = get( k )
def apply( k: K, v: V ) = put( k, v )
def get( k: K ): V =
(cache.get( k ) match {
case Some(c) => c
case None => {
val c = radioactive.cell[V]()
c( new WaitSignal )
cache += ( k -> c ) // store the cell in the cache before calling the fetcher ( in case the fetch is sync )
fetcher( k )
c
}
})()
def put( k: K, v: V ) : Unit = getOrCreateCell( k )( v )
def put( k: K, e: Throwable, v: V ) : Unit = getOrCreateCell( k )( e, v )
def put( k: K, t: Try[V] ) : Unit = getOrCreateCell( k )( t )
private def getOrCreateCell( k: K ): Cell[V] =
cache.get( k ) match {
case Some(c) => c
case None => {
val c = radioactive.cell[V]()
cache += ( k -> c )
c
}
}
def reset(): Unit = reset( k => true )
def reset( filter: K => Boolean ): Unit =
cache.toSeq
.filter( x => filter( x._1 ) )
.foreach( { case ( k, c ) => if ( c.has_notifiers_? ) c( new WaitSignal ) else cache.remove( k ) } )
}
class EchoService( delay: Double = 1000 ) extends Function1[String, String] {
private val cells = new mutable.HashMap[String, Cell[String]]
def apply( message: String ): String = {
cells.getOrElseUpdate( message, {
val c = radioactive.cell[String]( )
c( new WaitSignal )
util.delay( delay, () => c( message ) )
c
})()
}
}
class Forker {
private var pres = 0
private val monitors = new MonitorCollector
def apply[X]( expr: => X ): X = {
val res = ReactiveEval.eval( () => expr )
if ( WaitSignal.check( res.result ) ){
pres = pres + 1
res.monitor.foreach( monitors.push _ )
( null ).asInstanceOf[X]
} else {
res.unbox()
}
}
def join(): Unit = {
monitors.join()
if ( pres > 0 ) throw new WaitSignal
}
}
class NotifierPool {
private val notifiers = new mutable.ArrayBuffer[Notifier]()
def allocate(): Unit = {
radioactive.notifier().foreach( n => {
// TODO: listen to cancel / destroy events
notifiers += n
} )
}
def flush(): Unit ={
val ns = notifiers.toList
notifiers.clear()
ns.foreach( n => n() )
}
def size = notifiers.size
def nonEmpty = notifiers.nonEmpty
// TODO: cancel/destroy
}
class PartialResultMarker {
private var flag = false
def run[T]( expr: Expr[T] ): T = PartialResultMarker.stackval.run( expr, () => this )
def mark(): Unit = { flag = true }
def marked: Boolean = flag
}
object PartialResultMarker {
private lazy val stackval = new StackVal[PartialResultMarker]()
def mark() = stackval.get.mark()
def wrap[T]( expr: Expr[T] ): Expr[T] => PossiblyPartialResult[T] =
( expr: Expr[T] ) => {
val prm = new PartialResultMarker( )
val res = prm.run( expr )
new PossiblyPartialResult[T]( res, prm.marked )
}
def run[T]( expr: Expr[T] ): PossiblyPartialResult[T] = {
val prm = new PartialResultMarker( )
val res = prm.run( expr )
new PossiblyPartialResult[T]( res, prm.marked )
}
}
class PossiblyPartialResult[T]( val result: T, val partial: Boolean )
private class ReactiveEval[T]( val expr: () => T ) {
private var _monitor: Option[Monitor] = None
private def monitor = _monitor
private lazy val lazy_monitor: Monitor = {
_monitor = Some( new Monitor(){} )
_monitor.get
}
private def run( ): ReactiveEvalResult[T] = {
// evaluate expression first. it may create a monitor
// order is important ( mutable state sucks, I know )
val t = Try( expr() )
// and now compose result
return new ReactiveEvalResult[T]( t, monitor )
}
private def allocate_notifier(): Notifier = lazy_monitor.allocate_notifier()
}
private object ReactiveEval {
private lazy val stack = new mutable.ArrayStack[ReactiveEval[_]]()
def notifier(): Option[Notifier] = stack.lastOption.map( _.allocate_notifier )
def active(): Boolean = stack.nonEmpty
def eval[T]( expr: () => T ): ReactiveEvalResult[T] = {
val rev = new ReactiveEval[T]( expr )
stack.push(rev)
val r = rev.run()
stack.pop()
r
}
}
private class ReactiveEvalResult[T]( val result: Try[T], val monitor: Option[Monitor] ) {
def unbox() = {
monitor.map( _.bubble() )
result.get
}
}
object sandbox {
def test(): Unit ={
def fetchUserName(id:Int): String = ???
// getting all users from an async service
def getUserNames( ids: Seq[Int], cb: ( Seq[String] ) => Unit ): Unit ={
radioactive loop {
val fork = radioactive.fork()
val names = ( 1 to 10 ).map( id => fork( fetchUserName( id ) ) )
fork.join()
cb( names )
radioactive.stop()
}
}
radioactive loop {
val fork = radioactive.fork( )
fork { 8 }
fork { 77 }
fork.join( )
radioactive.stop( )
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment