Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created May 27, 2015 21:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/6e258bf03297c054e169 to your computer and use it in GitHub Desktop.
Save djspiewak/6e258bf03297c054e169 to your computer and use it in GitHub Desktop.
def multicast[A](p1: Process[Task, A], bound: Int = 10): Process[Task, Process[Task, A]] = Process suspend {
val queues = async signalOf Set[async.mutable.Queue[A]]()
def publish(a: A): Task[Unit] = for {
qsList <- queues.discrete filter { s => !s.isEmpty } take 1 runLog
qs = qsList flatMap { _.toList }
_ <- Task gatherUnordered (qs.toList map { _ enqueueOne a })
} yield ()
def unsubscribe(q: async.mutable.Queue[A]): Task[Unit] = for {
qs <- queues.get
qs2 = qs - q
result <- queues compareAndSet {
case Some(`qs`) => Some(qs2)
case opt => opt
}
_ <- if (result == Some(qs2))
q.close
else
unsubscribe(q)
} yield ()
lazy val subscribe: Task[Process[Task, A]] = for {
qs <- queues.get
q = async.boundedQueue[A](bound)
qs2 = qs + q
result <- queues compareAndSet {
case Some(`qs`) => Some(qs2)
case opt => opt
}
p = q.dequeue onComplete (Process eval_ unsubscribe(q))
back <- if (result == Some(qs2))
Task now p
else
subscribe
} yield back
val driver = p1 to (sink lift publish) drain
val source = Process repeatEval subscribe
driver merge source onComplete (Process eval_ (queues set Set())) // unsubscribe everyone, but allow drains
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment