Skip to content

Instantly share code, notes, and snippets.

@filosganga
Created February 25, 2020 22:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save filosganga/738210cf0ddb4d30724184ed9f15ca7b to your computer and use it in GitHub Desktop.
Save filosganga/738210cf0ddb4d30724184ed9f15ca7b to your computer and use it in GitHub Desktop.
Several effectful Queue implementation using cats-effect
import cats.data._
import cats.implicits._
import cats.effect._
import cats.effect.concurrent._
trait Queue[F[_], A] {
def enqueue(a: A): F[Unit]
def dequeue: F[A]
}
object Queue {
def one[F[_]: Concurrent, A]: F[Queue[F, A]] = {
Ref.of[F, Option[Deferred[F, A]]](none[Deferred[F, A]]).map { ref =>
new Queue[F, A] {
val createOrExistingDefer = Deferred[F,A].flatMap { promise =>
ref.modify {
case Some(p) => (None, p)
case None => (Some(promise), promise)
}
}
def enqueue(a: A): F[Unit] = {
createOrExistingDefer.flatMap(_.complete(a))
}
val dequeue: F[A] = {
createOrExistingDefer.flatMap(_.get)
}
}
}
}
def unbounded[F[_]: Concurrent, A]: F[Queue[F, A]] = {
case class ReadsAndWrites(
reads: Chain[Deferred[F, A]],
items: Chain[A]
)
object ReadsAndWrites {
val empty = ReadsAndWrites(Chain.empty, Chain.empty)
}
Ref.of[F, ReadsAndWrites](ReadsAndWrites.empty).map { ref =>
new Queue[F, A] {
def enqueue(a: A): F[Unit] = {
ref.modify { rw =>
rw.reads.uncons match {
case Some((p, rest)) => rw.copy(reads = rest) -> p.complete(a)
case None => rw.copy(items = rw.items.append(a)) -> ().pure[F]
}
}.flatten
}
val dequeue: F[A] = {
Deferred[F, A].flatMap { promise =>
ref.modify { rw =>
rw.items.uncons match {
case Some((item, rest)) => (rw.copy(items = rest) -> item.pure[F])
case None => (rw.copy(reads = rw.reads.append(promise)) -> promise.get)
}
}.flatten
}
}
}
}
}
def bounded[F[_]: Concurrent, A](bound: Int): F[Queue[F, A]] = {
sealed trait Status
case object Idle extends Status
case class Empty(pendingReads: NonEmptyChain[Deferred[F, A]]) extends Status
case class NonEmpty(values: NonEmptyChain[A]) extends Status
Ref.of[F, Status](Idle).flatMap { status =>
Semaphore[F](bound).map { semaphore =>
new Queue[F, A] {
def enqueue(a: A): F[Unit] = {
semaphore.withPermit {
status.modify {
case Idle => NonEmpty(NonEmptyChain.one(a))->().pure[F]
case NonEmpty(values) => NonEmpty(values :+ a)->().pure[F]
case Empty(pendingReads) => {
val nextState = pendingReads.tail.uncons match {
case Some((a, chain)) => Empty(NonEmptyChain.one(a).prependChain(chain))
case None => Idle
}
nextState -> pendingReads.head.complete(a)
}
}.flatten
}
}
val dequeue: F[A] = {
Deferred[F, A].flatMap { promise =>
status.modify {
case Idle => Empty(NonEmptyChain.one(promise))->promise.get
case Empty(pendingReads) => Empty(pendingReads :+ promise)->promise.get
case NonEmpty(values) => {
val nextStatus = values.tail.uncons match {
case Some((a, chain)) => NonEmpty(NonEmptyChain.one(a).prependChain(chain))
case None => Idle
}
nextStatus -> values.head.pure[F]
}
}.flatten
}
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment