Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Last active December 30, 2015 11:59
Show Gist options
  • Save pchlupacek/7826399 to your computer and use it in GitHub Desktop.
Save pchlupacek/7826399 to your computer and use it in GitHub Desktop.
StateTopic
/**
* Topic that wrap state processor to produce
* - discrete stream of states that was created by publishing `A`
* - continuous stream of states that was created by publishing `A`
* - subscription to states and updates (S,A)
*/
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) {
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal(
collect[(A or (S, A)), A]({ case a: A => a }) |>
stateProcessor |>
collect[(S, A), (A or (S, A))]({ case tpl@(s: S, a: A) => tpl.asInstanceOf[(A or (S, A))] })
)
/** sink that publishes to topic **/
def publish: TSink[A] = topic.publish.contramap((a: A) => a.asInstanceOf[(A or (S, A))])
/** publish one `A` to this state topic **/
def publishOne(a: A): Task[Unit] = topic.publishOne(a.asInstanceOf[(A or (S, A))])
/** subscription to state and updates **/
def subscribe: TSource[(S, A)] = topic.subscribe(last, bufferAll).collect[(S, A)] {
case tpl: (S, A) => tpl
}
/**
* Continuous wye, that first reads from L,
* Then when L is not available it reads from R echoing any S that was received from `L`
* Will halt once the
*/
private def echoLeft[T]: Wye[T, Any, T] = {
def go(s: T): Wye[T, Any, T] = receiveBoth({
case ReceiveL(s2) => emit(s2) fby go(s2)
case ReceiveR(_) => emit(s) fby go(s)
case HaltL(rsn) => Halt(rsn)
case HaltR(rsn) => Halt(rsn)
})
awaitL[T].flatMap(s => emit(s) fby go(s))
}
/** Immutable signal of state form this topic **/
def signal: Signal[S] = new Signal[S] {
def changed: stream.Process[Task, Boolean] =
discrete.map(_ => true).wye(Process.constant(()))(echoLeft)
def discrete: stream.Process[Task, S] =
topic.subscribe(last, last).collect[S] {
case tpl: (S, A) => tpl._1
}
def continuous: stream.Process[Task, S] =
discrete.wye(Process.constant(()))(echoLeft)
def changes: stream.Process[Task, Unit] =
discrete.map(_=>())
}
/**
* creates a topic that connects to this topic, and transforms
* (S,A) to new (SB,B)
*/
def connect[SB,B](p:Process1[(S,A),(SB,B)]) : StateTopic[SB,B] = ???
/**
* creates a topic, that takes messages from this topic and joins them vith provided wye
* to from new StateTopic[SC,C]
*
**/
def wye[SB,SC,B,C](wye:Wye[(S,A),(SB,B),(SC,C)])(other:StateTopic[SB,B]) : StateTopic[SC,C] = ???
}
@pchiusano
Copy link

What is or, TSource, and TSink?

@pchlupacek
Copy link
Author

ah sorry, this is from or framework. Disregard or, this can be replaced by / (is actually union type), TSource[A] is Source[Task,A] and TSink is Sink[Task,A]

@pchlupacek
Copy link
Author

idea is to have on topic combinator like :

toStateTopic

@pchlupacek
Copy link
Author

also please note the topic implementaion here uses our extensions with topic (journal, reconcile, buffer)....

@pchlupacek
Copy link
Author

This may allow easily to graph complex stream flows that do have dependencies on each other

@pchiusano
Copy link

Seems reasonable. Would it make sense to call this JournaledTopic? Also, should it be Process1[A,(S,A)] or Process1[A, S \/ A] (aka Writer1[S,A,A])? Do you always want to have to emit a new state?

@pchlupacek
Copy link
Author

I will think about this. I like the idea with writer...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment