Last active
September 24, 2023 16:08
-
-
Save sshark/e6619eb0c26b94346f2b70cdd9c3988e to your computer and use it in GitHub Desktop.
My answers to the exercises in https://typelevel.org/blog/2020/10/30/concurrency-in-ce3.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.teckhooi.concurrent | |
import cats.effect.{IO, IOApp} | |
import cats.implicits.catsSyntaxFlatMapOps | |
object ParTraverseApp extends IOApp.Simple { | |
override def run: IO[Unit] = { | |
def parTraverse[A, B](as: List[A])(f: A => IO[B]): IO[List[B]] = | |
as.map(a => f(a).start) | |
.foldLeft(IO.pure(List.empty[B]))((ioList, ioFibre) => | |
IO.both(ioList, ioFibre).flatMap((xs, f) => f.joinWithNever.map(xs :+ _)) | |
) | |
(parTraverse(List(1, 2, 3))(x => IO.pure(x + 1))) >>= (IO.println) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.teckhooi.concurrent | |
import cats.effect.{Deferred, IO, IOApp} | |
import scala.concurrent.duration.* | |
object QueueApp extends IOApp.Simple { | |
override def run: IO[Unit] = for { | |
start <- IO.realTime | |
queue <- Queue[Int] | |
_ <- (IO.println("Put 1") *> queue.put(1)).start | |
_ <- (IO.println("Put 2") *> queue.put(2)).start | |
_ <- (IO.println("Put 3") *> queue.put(3)).start | |
_ <- (IO.println("Put 4") *> queue.put(4)).start | |
a <- queue.take | |
b <- queue.take | |
c <- queue.take | |
d <- queue.take | |
f3 <- queue.take.start | |
_ <- IO.sleep(1.second) *> queue.put(10) | |
e <- f3.joinWithNever | |
_ <- IO.println(s"output: ${List(a, b, c, d, e)}") | |
end <- IO.realTime | |
_ <- IO.println(s"Total time taken: ${(end - start).toMillis}ms") | |
} yield () | |
} | |
trait Queue[A] { | |
def put(a: A): IO[Unit] | |
def tryPut(a: A): IO[Boolean] | |
def take: IO[A] | |
def tryTake: IO[Option[A]] | |
def peek: IO[Option[A]] | |
} | |
final case class State[A](values: List[A], waiter: List[Deferred[IO, A]]) | |
object Queue { | |
def apply[A]: IO[Queue[A]] = for { | |
stateRef <- IO.ref(State(List.empty[A], List.empty[Deferred[IO, A]])) | |
} yield new Queue[A] { | |
override def put(a: A): IO[Unit] = | |
stateRef.flatModify(s => | |
if (s.waiter.isEmpty) { | |
val xs = s.values :+ a | |
(State(xs, s.waiter), IO.println(s"put($a): $xs")) | |
} else | |
( | |
State(List.empty[A], s.waiter.tail), | |
s.waiter.head.complete(a) *> | |
IO.println(s"put($a): fulfill deferred value") | |
) | |
) | |
/** | |
* A more complicated solution using `Ref.access`. It only creates `Deferred` once and only if necessarily | |
*/ | |
def take_alt_access: IO[A] = { | |
def take_(ref: Ref[IO, State[A]], deferred: Deferred[IO, A]): IO[A] = | |
ref.access.flatMap((state, setter) => | |
if (state.values.isEmpty) | |
if (deferred == null) | |
IO.deferred[A] | |
.flatMap(deferred => | |
setter(State(state.values, state.waiter :+ deferred)).ifM(deferred.get, take_(ref, deferred))) | |
else | |
setter(State(state.values, state.waiter :+ deferred)).ifM(deferred.get, take_(ref, deferred))) | |
else | |
setter(State(state.values.tail, state.waiter)).ifM(IO.pure(state.values.head), take_(ref, deferred)))) | |
take_(stateRef, null) | |
} | |
override def take: IO[A] = | |
IO.deferred[A] | |
.flatMap(deferred => | |
stateRef.flatModify(state => | |
if (state.values.isEmpty) | |
(State(state.values, state.waiter :+ deferred), deferred.get) | |
else | |
(State(state.values.tail, state.waiter), IO.pure(state.values.head)) | |
) | |
) | |
override def tryPut(a: A): IO[Boolean] = | |
stateRef.modify(s => (State(s.values :+ a, s.waiter), true)) | |
override def tryTake: IO[Option[A]] = stateRef.flatModify(state => | |
if (state.values.isEmpty) (state, IO.pure(None)) | |
else (State(state.values.tail, state.waiter), IO.pure(state.values.headOption)) | |
) | |
override def peek: IO[Option[A]] = stateRef.modify(state => | |
if (state.values.isEmpty) (state, None) else (state, state.values.headOption) | |
) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.teckhooi.concurrent | |
import cats.effect.{Deferred, IO, IOApp} | |
import scala.concurrent.duration.* | |
object SemaphoreApp extends IOApp.Simple { | |
override def run: IO[Unit] = for { | |
s <- Semaphore(3) | |
start <- IO.realTime | |
f1 <- (s.acquire *> IO.println("Task 1") *> IO.sleep(1.second) *> s.release).start | |
f2 <- (s.acquire *> IO.println("Task 2") *> IO.sleep(2.second) *> s.release).start | |
f3 <- (s.acquire *> IO.println("Task 3") *> IO.sleep(3.second) *> s.release).start | |
_ <- IO.sleep(100.millis) | |
f4 <- (s.acquire *> IO.println("Task 4") *> IO.sleep(3.second) *> s.release).start | |
f5 <- (s.acquire *> IO.println("Task 5") *> s.release).start | |
_ <- f1.joinWithNever | |
_ <- f4.joinWithNever | |
_ <- f5.joinWithNever | |
_ <- f2.joinWithNever | |
_ <- f3.joinWithNever | |
finish <- IO.realTime | |
_ <- IO.println(s"Total time taken to complete is ${(finish - start).toMillis}ms") | |
} yield () | |
final case class State(max: Int, curr: Int, waiter: List[Deferred[IO, Unit]]) | |
trait Semaphore { | |
def acquire: IO[Unit] | |
def release: IO[Unit] | |
} | |
object Semaphore { | |
def apply(permits: Int): IO[Semaphore] = | |
for { | |
stateRef <- IO.ref(State(permits, 0, List.empty[Deferred[IO, Unit]])) | |
} yield new Semaphore { | |
override def acquire: IO[Unit] = | |
for { | |
latch <- IO.deferred[Unit] | |
s <- stateRef.flatModify(state => | |
if (state.max == state.curr) { | |
State(state.max, state.curr, state.waiter :+ latch) -> latch.get | |
} else State(state.max, state.curr + 1, state.waiter) -> IO.unit | |
) | |
} yield s | |
override def release: IO[Unit] = stateRef.flatModify { | |
case State(max, now, xs) if xs.isEmpty => | |
State(max, now - 1, xs) -> IO.unit | |
case State(max, now, xs) => | |
State(max, now - 1, xs.drop(1)) -> xs.headOption.fold(IO.unit)(_.complete(())) | |
}.void | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment