Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created February 13, 2022 00:24
Show Gist options
  • Save djspiewak/847532614a166af9f811e972d6f2c36f to your computer and use it in GitHub Desktop.
Save djspiewak/847532614a166af9f811e972d6f2c36f to your computer and use it in GitHub Desktop.
final class Channel[F[_], A] private (q: Queue[F, A], closed: Ref[F, Boolean])(implicit F: Monad[F]) {
// doesn't interrupt taking in progress
def close: F[Unit] = closed.set(true)
def isClosed: F[Boolean] = closed.get
def send(a: A): F[Option[Unit]] =
closed.get.ifM(q.offer(a).map(Some(_)), F.pure(None))
def take: Stream[F, A] = {
val takeN: F[Chunk[A]] =
q.tryTakeN(None) flatMap {
case None =>
closed.get.ifM(q.take.map(Chunk.singleton(_)), F.pure(Chunk.empty[A]))
case Some(as) =>
F.pure(Chunk.seq(as))
}
// you can do this more efficiently, just proves a point
Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).flatMap(Stream.chunk(_))
}
}
object Channel {
def apply[F[_]: Concurrent, A](bound: Int): F[Channel[F, A]] =
(Queue.bounded[F, A](bound), Concurrent[F].ref(false)).mapN(new Channel(_, _))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment