Skip to content

Instantly share code, notes, and snippets.

Last active December 30, 2015 11:59
Show Gist options
  • Save pchlupacek/7826399 to your computer and use it in GitHub Desktop.
Save pchlupacek/7826399 to your computer and use it in GitHub Desktop.
* Topic that wrap state processor to produce
* - discrete stream of states that was created by publishing `A`
* - continuous stream of states that was created by publishing `A`
* - subscription to states and updates (S,A)
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) {
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal(
collect[(A or (S, A)), A]({ case a: A => a }) |>
stateProcessor |>
collect[(S, A), (A or (S, A))]({ case tpl@(s: S, a: A) => tpl.asInstanceOf[(A or (S, A))] })
/** sink that publishes to topic **/
def publish: TSink[A] = topic.publish.contramap((a: A) => a.asInstanceOf[(A or (S, A))])
/** publish one `A` to this state topic **/
def publishOne(a: A): Task[Unit] = topic.publishOne(a.asInstanceOf[(A or (S, A))])
/** subscription to state and updates **/
def subscribe: TSource[(S, A)] = topic.subscribe(last, bufferAll).collect[(S, A)] {
case tpl: (S, A) => tpl
* Continuous wye, that first reads from L,
* Then when L is not available it reads from R echoing any S that was received from `L`
* Will halt once the
private def echoLeft[T]: Wye[T, Any, T] = {
def go(s: T): Wye[T, Any, T] = receiveBoth({
case ReceiveL(s2) => emit(s2) fby go(s2)
case ReceiveR(_) => emit(s) fby go(s)
case HaltL(rsn) => Halt(rsn)
case HaltR(rsn) => Halt(rsn)
awaitL[T].flatMap(s => emit(s) fby go(s))
/** Immutable signal of state form this topic **/
def signal: Signal[S] = new Signal[S] {
def changed: stream.Process[Task, Boolean] = => true).wye(Process.constant(()))(echoLeft)
def discrete: stream.Process[Task, S] =
topic.subscribe(last, last).collect[S] {
case tpl: (S, A) => tpl._1
def continuous: stream.Process[Task, S] =
def changes: stream.Process[Task, Unit] =>())
* creates a topic that connects to this topic, and transforms
* (S,A) to new (SB,B)
def connect[SB,B](p:Process1[(S,A),(SB,B)]) : StateTopic[SB,B] = ???
* creates a topic, that takes messages from this topic and joins them vith provided wye
* to from new StateTopic[SC,C]
def wye[SB,SC,B,C](wye:Wye[(S,A),(SB,B),(SC,C)])(other:StateTopic[SB,B]) : StateTopic[SC,C] = ???
Copy link

What is or, TSource, and TSink?

Copy link

ah sorry, this is from or framework. Disregard or, this can be replaced by / (is actually union type), TSource[A] is Source[Task,A] and TSink is Sink[Task,A]

Copy link

idea is to have on topic combinator like :


Copy link

also please note the topic implementaion here uses our extensions with topic (journal, reconcile, buffer)....

Copy link

This may allow easily to graph complex stream flows that do have dependencies on each other

Copy link

Seems reasonable. Would it make sense to call this JournaledTopic? Also, should it be Process1[A,(S,A)] or Process1[A, S \/ A] (aka Writer1[S,A,A])? Do you always want to have to emit a new state?

Copy link

I will think about this. I like the idea with writer...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment