Skip to content

Instantly share code, notes, and snippets.

@sshark
Last active September 24, 2023 16:08
Show Gist options
  • Save sshark/e6619eb0c26b94346f2b70cdd9c3988e to your computer and use it in GitHub Desktop.
Save sshark/e6619eb0c26b94346f2b70cdd9c3988e to your computer and use it in GitHub Desktop.
package org.teckhooi.concurrent
import cats.effect.{IO, IOApp}
import cats.implicits.catsSyntaxFlatMapOps
object ParTraverseApp extends IOApp.Simple {
override def run: IO[Unit] = {
def parTraverse[A, B](as: List[A])(f: A => IO[B]): IO[List[B]] =
as.map(a => f(a).start)
.foldLeft(IO.pure(List.empty[B]))((ioList, ioFibre) =>
IO.both(ioList, ioFibre).flatMap((xs, f) => f.joinWithNever.map(xs :+ _))
)
(parTraverse(List(1, 2, 3))(x => IO.pure(x + 1))) >>= (IO.println)
}
}
package org.teckhooi.concurrent
import cats.effect.{Deferred, IO, IOApp}
import scala.concurrent.duration.*
object QueueApp extends IOApp.Simple {
override def run: IO[Unit] = for {
start <- IO.realTime
queue <- Queue[Int]
_ <- (IO.println("Put 1") *> queue.put(1)).start
_ <- (IO.println("Put 2") *> queue.put(2)).start
_ <- (IO.println("Put 3") *> queue.put(3)).start
_ <- (IO.println("Put 4") *> queue.put(4)).start
a <- queue.take
b <- queue.take
c <- queue.take
d <- queue.take
f3 <- queue.take.start
_ <- IO.sleep(1.second) *> queue.put(10)
e <- f3.joinWithNever
_ <- IO.println(s"output: ${List(a, b, c, d, e)}")
end <- IO.realTime
_ <- IO.println(s"Total time taken: ${(end - start).toMillis}ms")
} yield ()
}
trait Queue[A] {
def put(a: A): IO[Unit]
def tryPut(a: A): IO[Boolean]
def take: IO[A]
def tryTake: IO[Option[A]]
def peek: IO[Option[A]]
}
final case class State[A](values: List[A], waiter: List[Deferred[IO, A]])
object Queue {
def apply[A]: IO[Queue[A]] = for {
stateRef <- IO.ref(State(List.empty[A], List.empty[Deferred[IO, A]]))
} yield new Queue[A] {
override def put(a: A): IO[Unit] =
stateRef.flatModify(s =>
if (s.waiter.isEmpty) {
val xs = s.values :+ a
(State(xs, s.waiter), IO.println(s"put($a): $xs"))
} else
(
State(List.empty[A], s.waiter.tail),
s.waiter.head.complete(a) *>
IO.println(s"put($a): fulfill deferred value")
)
)
/**
* A more complicated solution using `Ref.access`. It only creates `Deferred` once and only if necessarily
*/
def take_alt_access: IO[A] = {
def take_(ref: Ref[IO, State[A]], deferred: Deferred[IO, A]): IO[A] =
ref.access.flatMap((state, setter) =>
if (state.values.isEmpty)
if (deferred == null)
IO.deferred[A]
.flatMap(deferred =>
setter(State(state.values, state.waiter :+ deferred)).ifM(deferred.get, take_(ref, deferred)))
else
setter(State(state.values, state.waiter :+ deferred)).ifM(deferred.get, take_(ref, deferred)))
else
setter(State(state.values.tail, state.waiter)).ifM(IO.pure(state.values.head), take_(ref, deferred))))
take_(stateRef, null)
}
override def take: IO[A] =
IO.deferred[A]
.flatMap(deferred =>
stateRef.flatModify(state =>
if (state.values.isEmpty)
(State(state.values, state.waiter :+ deferred), deferred.get)
else
(State(state.values.tail, state.waiter), IO.pure(state.values.head))
)
)
override def tryPut(a: A): IO[Boolean] =
stateRef.modify(s => (State(s.values :+ a, s.waiter), true))
override def tryTake: IO[Option[A]] = stateRef.flatModify(state =>
if (state.values.isEmpty) (state, IO.pure(None))
else (State(state.values.tail, state.waiter), IO.pure(state.values.headOption))
)
override def peek: IO[Option[A]] = stateRef.modify(state =>
if (state.values.isEmpty) (state, None) else (state, state.values.headOption)
)
}
}
package org.teckhooi.concurrent
import cats.effect.{Deferred, IO, IOApp}
import scala.concurrent.duration.*
object SemaphoreApp extends IOApp.Simple {
override def run: IO[Unit] = for {
s <- Semaphore(3)
start <- IO.realTime
f1 <- (s.acquire *> IO.println("Task 1") *> IO.sleep(1.second) *> s.release).start
f2 <- (s.acquire *> IO.println("Task 2") *> IO.sleep(2.second) *> s.release).start
f3 <- (s.acquire *> IO.println("Task 3") *> IO.sleep(3.second) *> s.release).start
_ <- IO.sleep(100.millis)
f4 <- (s.acquire *> IO.println("Task 4") *> IO.sleep(3.second) *> s.release).start
f5 <- (s.acquire *> IO.println("Task 5") *> s.release).start
_ <- f1.joinWithNever
_ <- f4.joinWithNever
_ <- f5.joinWithNever
_ <- f2.joinWithNever
_ <- f3.joinWithNever
finish <- IO.realTime
_ <- IO.println(s"Total time taken to complete is ${(finish - start).toMillis}ms")
} yield ()
final case class State(max: Int, curr: Int, waiter: List[Deferred[IO, Unit]])
trait Semaphore {
def acquire: IO[Unit]
def release: IO[Unit]
}
object Semaphore {
def apply(permits: Int): IO[Semaphore] =
for {
stateRef <- IO.ref(State(permits, 0, List.empty[Deferred[IO, Unit]]))
} yield new Semaphore {
override def acquire: IO[Unit] =
for {
latch <- IO.deferred[Unit]
s <- stateRef.flatModify(state =>
if (state.max == state.curr) {
State(state.max, state.curr, state.waiter :+ latch) -> latch.get
} else State(state.max, state.curr + 1, state.waiter) -> IO.unit
)
} yield s
override def release: IO[Unit] = stateRef.flatModify {
case State(max, now, xs) if xs.isEmpty =>
State(max, now - 1, xs) -> IO.unit
case State(max, now, xs) =>
State(max, now - 1, xs.drop(1)) -> xs.headOption.fold(IO.unit)(_.complete(()))
}.void
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment