Last active
December 30, 2015 11:59
-
-
Save pchlupacek/7826399 to your computer and use it in GitHub Desktop.
StateTopic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Topic that wrap state processor to produce | |
* - discrete stream of states that was created by publishing `A` | |
* - continuous stream of states that was created by publishing `A` | |
* - subscription to states and updates (S,A) | |
*/ | |
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) { | |
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal( | |
collect[(A or (S, A)), A]({ case a: A => a }) |> | |
stateProcessor |> | |
collect[(S, A), (A or (S, A))]({ case tpl@(s: S, a: A) => tpl.asInstanceOf[(A or (S, A))] }) | |
) | |
/** sink that publishes to topic **/ | |
def publish: TSink[A] = topic.publish.contramap((a: A) => a.asInstanceOf[(A or (S, A))]) | |
/** publish one `A` to this state topic **/ | |
def publishOne(a: A): Task[Unit] = topic.publishOne(a.asInstanceOf[(A or (S, A))]) | |
/** subscription to state and updates **/ | |
def subscribe: TSource[(S, A)] = topic.subscribe(last, bufferAll).collect[(S, A)] { | |
case tpl: (S, A) => tpl | |
} | |
/** | |
* Continuous wye, that first reads from L, | |
* Then when L is not available it reads from R echoing any S that was received from `L` | |
* Will halt once the | |
*/ | |
private def echoLeft[T]: Wye[T, Any, T] = { | |
def go(s: T): Wye[T, Any, T] = receiveBoth({ | |
case ReceiveL(s2) => emit(s2) fby go(s2) | |
case ReceiveR(_) => emit(s) fby go(s) | |
case HaltL(rsn) => Halt(rsn) | |
case HaltR(rsn) => Halt(rsn) | |
}) | |
awaitL[T].flatMap(s => emit(s) fby go(s)) | |
} | |
/** Immutable signal of state form this topic **/ | |
def signal: Signal[S] = new Signal[S] { | |
def changed: stream.Process[Task, Boolean] = | |
discrete.map(_ => true).wye(Process.constant(()))(echoLeft) | |
def discrete: stream.Process[Task, S] = | |
topic.subscribe(last, last).collect[S] { | |
case tpl: (S, A) => tpl._1 | |
} | |
def continuous: stream.Process[Task, S] = | |
discrete.wye(Process.constant(()))(echoLeft) | |
def changes: stream.Process[Task, Unit] = | |
discrete.map(_=>()) | |
} | |
/** | |
* creates a topic that connects to this topic, and transforms | |
* (S,A) to new (SB,B) | |
*/ | |
def connect[SB,B](p:Process1[(S,A),(SB,B)]) : StateTopic[SB,B] = ??? | |
/** | |
* creates a topic, that takes messages from this topic and joins them vith provided wye | |
* to from new StateTopic[SC,C] | |
* | |
**/ | |
def wye[SB,SC,B,C](wye:Wye[(S,A),(SB,B),(SC,C)])(other:StateTopic[SB,B]) : StateTopic[SC,C] = ??? | |
} |
idea is to have on topic combinator like :
toStateTopic
also please note the topic implementaion here uses our extensions with topic (journal, reconcile, buffer)....
This may allow easily to graph
complex stream flows that do have dependencies on each other
Seems reasonable. Would it make sense to call this JournaledTopic
? Also, should it be Process1[A,(S,A)]
or Process1[A, S \/ A]
(aka Writer1[S,A,A]
)? Do you always want to have to emit a new state?
I will think about this. I like the idea with writer...
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ah sorry, this is from or framework. Disregard or, this can be replaced by / (is actually union type), TSource[A] is Source[Task,A] and TSink is Sink[Task,A]