Last active
December 30, 2015 11:59
-
-
Save pchlupacek/7826399 to your computer and use it in GitHub Desktop.
StateTopic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Topic that wrap state processor to produce | |
* - discrete stream of states that was created by publishing `A` | |
* - continuous stream of states that was created by publishing `A` | |
* - subscription to states and updates (S,A) | |
*/ | |
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) { | |
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal( | |
collect[(A or (S, A)), A]({ case a: A => a }) |> | |
stateProcessor |> | |
collect[(S, A), (A or (S, A))]({ case tpl@(s: S, a: A) => tpl.asInstanceOf[(A or (S, A))] }) | |
) | |
/** sink that publishes to topic **/ | |
def publish: TSink[A] = topic.publish.contramap((a: A) => a.asInstanceOf[(A or (S, A))]) | |
/** publish one `A` to this state topic **/ | |
def publishOne(a: A): Task[Unit] = topic.publishOne(a.asInstanceOf[(A or (S, A))]) | |
/** subscription to state and updates **/ | |
def subscribe: TSource[(S, A)] = topic.subscribe(last, bufferAll).collect[(S, A)] { | |
case tpl: (S, A) => tpl | |
} | |
/** | |
* Continuous wye, that first reads from L, | |
* Then when L is not available it reads from R echoing any S that was received from `L` | |
* Will halt once the | |
*/ | |
private def echoLeft[T]: Wye[T, Any, T] = { | |
def go(s: T): Wye[T, Any, T] = receiveBoth({ | |
case ReceiveL(s2) => emit(s2) fby go(s2) | |
case ReceiveR(_) => emit(s) fby go(s) | |
case HaltL(rsn) => Halt(rsn) | |
case HaltR(rsn) => Halt(rsn) | |
}) | |
awaitL[T].flatMap(s => emit(s) fby go(s)) | |
} | |
/** Immutable signal of state form this topic **/ | |
def signal: Signal[S] = new Signal[S] { | |
def changed: stream.Process[Task, Boolean] = | |
discrete.map(_ => true).wye(Process.constant(()))(echoLeft) | |
def discrete: stream.Process[Task, S] = | |
topic.subscribe(last, last).collect[S] { | |
case tpl: (S, A) => tpl._1 | |
} | |
def continuous: stream.Process[Task, S] = | |
discrete.wye(Process.constant(()))(echoLeft) | |
def changes: stream.Process[Task, Unit] = | |
discrete.map(_=>()) | |
} | |
/** | |
* creates a topic that connects to this topic, and transforms | |
* (S,A) to new (SB,B) | |
*/ | |
def connect[SB,B](p:Process1[(S,A),(SB,B)]) : StateTopic[SB,B] = ??? | |
/** | |
* creates a topic, that takes messages from this topic and joins them vith provided wye | |
* to from new StateTopic[SC,C] | |
* | |
**/ | |
def wye[SB,SC,B,C](wye:Wye[(S,A),(SB,B),(SC,C)])(other:StateTopic[SB,B]) : StateTopic[SC,C] = ??? | |
} |
Seems reasonable. Would it make sense to call this JournaledTopic
? Also, should it be Process1[A,(S,A)]
or Process1[A, S \/ A]
(aka Writer1[S,A,A]
)? Do you always want to have to emit a new state?
I will think about this. I like the idea with writer...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This may allow easily to
graph
complex stream flows that do have dependencies on each other