Last active
October 18, 2019 06:02
-
-
Save jilen/718d245bd095c9ad79f9ac6893df6623 to your computer and use it in GitHub Desktop.
Functional queue, similar to fs2 Queue, without Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.concurrent._ | |
import cats.effect._ | |
import cats.syntax.all._ | |
import scala.language.higherKinds | |
import scala.concurrent.duration._ | |
trait Queue[F[_], A] { | |
def timedDequeue1(duration: FiniteDuration, timer: Timer[F]): F[Option[A]] | |
def enqueue1(a: A): F[Unit] | |
} | |
final case class State[F[_], A]( | |
queue: Vector[A], | |
deq: Vector[Deferred[F, A]] | |
) | |
object Queue { | |
def empty[F[_]: ConcurrentEffect, A] = of[F, A](Vector.empty) | |
def of[F[_]: ConcurrentEffect, A](as: Vector[A]): F[Queue[F, A]] = { | |
Ref.of[F, State[F, A]](State(as, Vector.empty)).map { ref => | |
new DefaultQueue(ref) | |
} | |
} | |
def unsafe[F[_]: ConcurrentEffect, A](as: Vector[A]): Queue[F, A] = { | |
new DefaultQueue[F, A](Ref.unsafe(State(as, Vector.empty))) | |
} | |
private class DefaultQueue[F[_], A](ref: Ref[F, State[F, A]])(implicit F: ConcurrentEffect[F]) extends Queue[F, A] { | |
def enqueue1(a: A): F[Unit] = { | |
ref.modify { s => | |
if (s.deq.isEmpty) { | |
(s.copy(queue = s.queue :+ a), None) | |
} else { | |
(s.copy(deq = s.deq.tail), Some(s.deq.head)) | |
} | |
}.flatMap { | |
case Some(h) => | |
h.complete(a) | |
case None => | |
F.unit | |
} | |
} | |
def timedDequeue1(duration: FiniteDuration, timer: Timer[F]): F[Option[A]] = { | |
cancellableDequeue1().flatMap { | |
case (Right(v), _) => F.pure(Some(v)) | |
case (Left(defer), cancel) => | |
val timeout = timer.sleep(duration) | |
F.race(timeout, defer.get).flatMap { | |
case Right(v) => F.pure(Some(v)) | |
case Left(_) => cancel.as(None) | |
} | |
} | |
} | |
private def cancellableDequeue1(): F[(Either[Deferred[F, A], A], F[Unit])] = { | |
Deferred[F, A].flatMap { defer => | |
ref.modify { s => | |
if (s.queue.isEmpty) | |
(s.copy(deq = s.deq :+ defer), None) | |
else | |
(s.copy(queue = s.queue.drop(1)), Some(s.queue.take(1).head)) | |
}.map { | |
case Some(h) => | |
(Right(h), F.unit) | |
case None => | |
(Left(defer), ref.modify { s => | |
(s.copy(deq = s.deq.filterNot(_ == defer)), {}) | |
}) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment