Skip to content

Instantly share code, notes, and snippets.

View pchlupacek's full-sized avatar

Pavel Chlupacek pchlupacek

View GitHub Profile
@pchlupacek
pchlupacek / gist:7826399
Last active December 30, 2015 11:59
StateTopic
/**
* Topic that wrap state processor to produce
* - discrete stream of states that was created by publishing `A`
* - continuous stream of states that was created by publishing `A`
* - subscription to states and updates (S,A)
*/
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) {
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal(
collect[(A or (S, A)), A]({ case a: A => a }) |>