Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created February 13, 2022 00:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/7b7cf1d16c9e7c2d8f77ffef6f0294a5 to your computer and use it in GitHub Desktop.
Save djspiewak/7b7cf1d16c9e7c2d8f77ffef6f0294a5 to your computer and use it in GitHub Desktop.
class Channel[F[_], A] private (q: Queue[F, A], closed: Ref[F, Boolean])(implicit F: Monad[F]) {
// doesn't interrupt taking in progress
def close: F[Unit] = closed.set(true)
def isClosed: F[Boolean] = closed.get
def send(a: A): F[Option[Unit]] =
closed.get.ifM(q.offer(a).map(Some(_)), F.pure(None))
def take: Stream[F, A] = {
val takeN: F[Chunk[A]] =
q.tryTakeN(None) flatMap {
case None =>
closed.get.ifM(q.take.map(Chunk.singleton(_)), F.pure(Chunk.empty[A]))
case Some(as) =>
F.pure(Chunk.seq(as))
}
// you can do this more efficiently, just proves a point
Stream.eval(takeN).repeat.takeWhile(!_.isEmpty).flatMap(Stream.chunk(_))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment