Created
May 27, 2015 21:53
-
-
Save djspiewak/6e258bf03297c054e169 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def multicast[A](p1: Process[Task, A], bound: Int = 10): Process[Task, Process[Task, A]] = Process suspend { | |
val queues = async signalOf Set[async.mutable.Queue[A]]() | |
def publish(a: A): Task[Unit] = for { | |
qsList <- queues.discrete filter { s => !s.isEmpty } take 1 runLog | |
qs = qsList flatMap { _.toList } | |
_ <- Task gatherUnordered (qs.toList map { _ enqueueOne a }) | |
} yield () | |
def unsubscribe(q: async.mutable.Queue[A]): Task[Unit] = for { | |
qs <- queues.get | |
qs2 = qs - q | |
result <- queues compareAndSet { | |
case Some(`qs`) => Some(qs2) | |
case opt => opt | |
} | |
_ <- if (result == Some(qs2)) | |
q.close | |
else | |
unsubscribe(q) | |
} yield () | |
lazy val subscribe: Task[Process[Task, A]] = for { | |
qs <- queues.get | |
q = async.boundedQueue[A](bound) | |
qs2 = qs + q | |
result <- queues compareAndSet { | |
case Some(`qs`) => Some(qs2) | |
case opt => opt | |
} | |
p = q.dequeue onComplete (Process eval_ unsubscribe(q)) | |
back <- if (result == Some(qs2)) | |
Task now p | |
else | |
subscribe | |
} yield back | |
val driver = p1 to (sink lift publish) drain | |
val source = Process repeatEval subscribe | |
driver merge source onComplete (Process eval_ (queues set Set())) // unsubscribe everyone, but allow drains | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment