Forked from calvinlfer/Workshop.scala
Created October 17, 2021 18:20
Day 1 and 2 ZIO async and concurrent workshop
package net.degoes.zio
import zio._
import java.text.NumberFormat
import java.nio.charset.StandardCharsets
import zio.blocking.Blocking
import zio.console.Console
object ZIOTypes {
type ??? = Nothing
// ZIO[R, E, A]
* Provide definitions for the ZIO type aliases below.
type Task[+A] = ZIO[Any, Throwable, A]
type UIO[+A] = ZIO[Any, Nothing, A]
type RIO[-R, +A] = ZIO[R, Throwable, A]
type IO[+E, +A] = ZIO[Any, E, A]
type URIO[-R, +A] = ZIO[R, Nothing, A]
object HelloWorld extends App {
import zio.console._
* Implement a simple "Hello World!" program using the effect returned by `putStrLn`.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("Hello world!").as(0)
object PrintSequence extends App {
import zio.console._
* Using `*>` (`zipRight`), compose a sequence of `putStrLn` effects to
* produce an effect that prints three lines of text to the console.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("Hello") *> putStrLn("ZIO") *> putStrLn("World!") as 0
object ErrorRecovery extends App {
val StdInputFailed = 1
import zio.console._
val failed =
putStrLn("About to fail...") *>"Uh oh!") *>
putStrLn("This will NEVER be printed!")
* Using `ZIO#orElse` or `ZIO#fold`, have the `run` function compose the
* preceding `failed` effect into the effect that `run` returns.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
failed.orDieWith(s => new Error(s)) as 0
//failed.catchAll(error => putStrLn(error).as(1)) as 0
//(failed as 0) orElse ZIO.succeed(1)
//failed.fold(success = _ => 1, failure = _ => 0)
object Looping extends App {
import zio.console._
* Implement a `repeat` combinator using `flatMap` and recursion.
def repeat[R, E, A](n: Int)(effect: ZIO[R, E, A]): ZIO[R, E, A] =
if (n <= 1) effect
else effect *> repeat(n - 1)(effect)
// else effect.flatMap(_ => repeat(n - 1)(effect))
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
repeat(3)(putStrLn("All work and no play makes Jack a dull boy")) as 0
object EffectConversion extends App {
* Using ZIO.effect, convert the side-effecting of `println` into a pure
* functional effect.
def myPrintLn(line: String): Task[Unit] =
// converts exceptions into
def run(args: List[String]) =
myPrintLn("foo").fold(_ => 1, _ => 0)
object ErrorNarrowing extends App {
implicit class Unimplemented[A](v: A) {
def ? = ???
* Using `ZIO#refineToOrDie`, narrow the error type of the following
* effect to IOException.
val myReadLine: IO[IOException, String] =
// convert Exception to IOException and put it on the typed channel,
// if any other errors then die (unrecoverable error)
def myPrintLn(line: String): UIO[Unit] = UIO(println(line))
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
_ <- myPrintLn("What is your name?")
name <- myReadLine
_ <- myPrintLn(s"Good to meet you, ${name}")
} yield 0) orElse ZIO.succeed(1)
object PromptName extends App {
val StdInputFailed = 1
import zio.console._
* Using `ZIO#flatMap`, implement a simple program that asks the user for
* their name (using `getStrLn`), and then prints it out to the user (using `putStrLn`).
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("What is your name?") *>
.flatMap(name => putStrLn(s"Hello $name"))
.fold(_ => 1, _ => 0)
object NumberGuesser extends App {
import zio.console._
import zio.random._
def analyzeAnswer(random: Int, guess: String) =
if (random.toString == guess.trim) putStrLn("You guessed correctly!")
else putStrLn(s"You did not guess correctly. The answer was ${random}")
* Choose a random number (using `nextInt`), and then ask the user to guess
* the number, feeding their response to `analyzeAnswer`, above.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("Pick a random number") *>
(nextInt(3) <*> getStrLn)
.flatMap { case (random, guess) => analyzeAnswer(random + 1, guess) }
.fold(_ => 1, _ => 0)
object AlarmApp extends App {
import zio.console._
import zio.duration._
import java.util.concurrent.TimeUnit
* Create an effect that will get a `Duration` from the user, by prompting
* the user to enter a decimal number of seconds. Use `refineOrDie` to
* narrow the error type as necessary.
lazy val getAlarmDuration: ZIO[Console, IOException, Duration] = {
def parseDuration(input: String): IO[NumberFormatException, Duration] =
def fallback(input: String): ZIO[Console, IOException, Duration] =
putStrLn(s"You entered $input which is not valid for seconds, try again!") *> getAlarmDuration
for {
_ <- putStrLn("Please enter the number of seconds to sleep: ")
input <- getStrLn
duration <- parseDuration(input) orElse fallback(input)
} yield duration
* Create a program that asks the user for a number of seconds to sleep,
* sleeps the specified number of seconds using ZIO.sleep(d), and then
* prints out a wakeup alarm message, like "Time to wakeup!!!".
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(getAlarmDuration.flatMap(ZIO.sleep) *> putStrLn("Time to wakeup!!!"))
.fold(_ => 1, _ => 0)
object Cat extends App {
import zio.console._
import zio.blocking._
* Implement a function to read a file on the blocking thread pool, storing
* the result into a string.
def readFile(file: String): ZIO[Blocking, IOException, String] =
blocking {
val open: Task[Source] = ZIO.effect(Source.fromFile(file))
val close = (s: Source) => ZIO.effect(s.close()).orDie
// bracket ensures that cleanup will be done if the resource was opened (even in the face of interruption)
open.bracket(close(_)) { source => ZIO.effect(source.mkString) }
* Implement a version of the command-line utility "cat", which dumps the
* contents of the specified file to standard output.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
if (args.isEmpty) putStrLn("Supply a file as an argument") as 1
else readFile(args.head).flatMap(putStrLn).fold(_ => 2, _ => 0)
class ZioSource private (private val source: Source) extends AnyVal {
import zio.blocking._
def close: ZIO[Blocking, IOException, Unit] = execute(_.close())
def execute[T](f: Source => T): ZIO[Blocking, IOException, T] =
object ZioSource {
import zio.blocking._
def apply(file: String): ZIO[Blocking, IOException, ZioSource] =
effectBlocking(new ZioSource(Source.fromFile(file))).refineToOrDie[IOException]
def resource(file: String): ZManaged[Blocking, IOException, ZioSource] =
object ReadFilesCleanup extends App {
import zio.console._
// this is not very composable
def readFiles(file1: String, file2: String): ZIO[Console with Blocking, IOException, Unit] =
ZioSource(file1).bracket(_.close.orDie) { source1 =>
ZioSource(file2).bracket(_.close.orDie) { source2 =>
(source1.execute(_.mkString) <*> source2.execute(_.mkString))
.flatMap { case (l, r) => putStrLn(l) *> putStrLn(r) }
def readFileBetter(file1: String, file2: String) =
(ZioSource.resource(file1) <*> ZioSource.resource(file2))
.use { case (l, r) =>
(l.execute(_.mkString) <*> r.execute(_.mkString))
.flatMap { case (l, r) => putStrLn(l) *> putStrLn(r) }
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = ???
object SourceManaged extends App {
import zio.console._
import zio.blocking._
import zio.duration._
final class ZioSource private (private val source: Source) {
def execute[T](f: Source => T): ZIO[Blocking, IOException, T] =
object ZioSource {
* Use the `ZManaged.make` constructor to make a managed data type that
* will automatically acquire and release the resource when it is used.
def make(file: String): ZManaged[Blocking, IOException, ZioSource] = {
// An effect that acquires the resource:
val open = effectBlocking(new ZioSource(Source.fromFile(file))).refineToOrDie[IOException]
// A function that, when given the resource, returns an effect that
// releases the resource:
val close: ZioSource => ZIO[Blocking, Nothing, Unit] =
* Implement a function to read a file on the blocking thread pool, storing
* the result into a string.
def readFiles(files: List[String]): ZIO[Blocking with Console, IOException, Unit] =
* Implement a function that prints out all files specified on the
* command-line. Only print out contents from these files if they
* can all be opened simultaneously. Otherwise, don't print out
* anything except an error message.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
object CatIncremental extends App {
import zio.console._
import zio.blocking._
import{IOException, InputStream, FileInputStream}
* Implement a `blockingIO` combinator to use in subsequent exercises.
def blockingIO[A](a: => A): ZIO[Blocking, IOException, A] =
* Implement all missing methods of `FileHandle`. Be sure to do all work on
* the blocking thread pool.
final case class FileHandle private (private val is: InputStream) {
final private def close: ZIO[Blocking, IOException, Unit] = blockingIO(is.close())
final def read: ZIO[Blocking, IOException, Option[Chunk[Byte]]] =
blockingIO {
val array = Array.ofDim[Byte](1024)
val bytesRead =
if (bytesRead == -1) None
else Some(Chunk.fromArray(array.take(bytesRead)))
object FileHandle {
final def open(file: String): ZManaged[Blocking, IOException, FileHandle] = {
val acquire = blockingIO {
val inputStream = new FileInputStream(new
val release = (f: FileHandle) => f.close.orDie
def cat(fh: FileHandle): ZIO[Blocking with Console, IOException, Unit] = {
case None => ZIO.unit
case Some(chunk) => putStr(new String(chunk.toArray, StandardCharsets.UTF_8)) *> cat(fh)
* Implement an incremental version of the `cat` utility, using `ZIO#bracket`
* or `ZManaged` to ensure the file is closed in the event of error or
* interruption.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
args match {
case file :: Nil =>
( as 0) orElse ZIO.succeed(1)
case _ => putStrLn("Usage: cat <file>") as 2
object AlarmAppImproved extends App {
import zio.console._
import zio.duration._
import java.util.concurrent.TimeUnit
lazy val getAlarmDuration: ZIO[Console, IOException, Duration] = {
def parseDuration(input: String): IO[NumberFormatException, Duration] =
ZIO.effect(Duration((input.toDouble * 1000.0).toLong, TimeUnit.MILLISECONDS))
val fallback = putStrLn("You didn't enter the number of seconds!") *> getAlarmDuration
for {
_ <- putStrLn("Please enter the number of seconds to sleep: ")
input <- getStrLn
duration <- parseDuration(input) orElse fallback
} yield duration
* Create a program that asks the user for a number of seconds to sleep,
* sleeps the specified number of seconds using ZIO.sleep(d), concurrently
* prints a dot every second that the alarm is sleeping for, and then
* prints out a wakeup alarm message, like "Time to wakeup!!!".
def run2(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
sleepFor <- getAlarmDuration
sleepFiber <- ZIO.sleep(sleepFor).fork
printFiber <- (putStrLn(".") *> ZIO.sleep(1.second)).forever.fork
_ <- sleepFiber.join
_ <- printFiber.interrupt
_ <- putStrLn("Time to wake up!")
} yield 0).fold(_ => 1, _ => 0)
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
sleepFor <- getAlarmDuration
sleep = ZIO.sleep(sleepFor)
print = (putStrLn(".") *> ZIO.sleep(1.second)).forever
_ <- sleep race print
_ <- putStrLn("Time to wake up!")
} yield 0).fold(_ => 1, _ => 0)
object ComputePi extends App {
import zio.random._
import zio.console._
import zio.clock._
import zio.duration._
import zio.stm._
* Some state to keep track of all points inside a circle,
* and total number of points.
final case class PiState(
inside: Ref[Long],
total: Ref[Long]
* A function to estimate pi.
def estimatePi(inside: Long, total: Long): Double =
(inside.toDouble / total.toDouble) * 4.0
* A helper function that determines if a point lies in
* a circle of 1 radius.
def insideCircle(x: Double, y: Double): Boolean =
Math.sqrt(x * x + y * y) <= 1.0
* An effect that computes a random (x, y) point.
val randomPoint: ZIO[Random, Nothing, (Double, Double)] =
nextDouble zip nextDouble
* Build a multi-fiber program that estimates the value of `pi`. Print out
* ongoing estimates continuously until the estimation is complete.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
def printEstimation(inside: Ref[Long], total: Ref[Long]): URIO[Console, Unit] =
(inside.get zip total.get).flatMap { case (in, tot) => putStrLn(estimatePi(in, tot).toString) }
def updateEstimate(inside: Ref[Long], total: Ref[Long]): URIO[Random, Unit] =
randomPoint.flatMap { case (x, y) =>
val updateTotal = total.update(_ + 1)
val updateInside = inside.update(_ + 1)
if (insideCircle(x, y)) updateInside zipPar updateTotal
else updateTotal
} *> updateEstimate(inside, total)
def keepEstimating(accuracyRequired: Double, inside: Ref[Long], total: Ref[Long]): URIO[Console, Unit] =
(inside.get zip total.get).flatMap { case (in, tot) =>
val estimate = estimatePi(in, tot)
val actual = math.abs(math.Pi - estimate)
if (actual <= accuracyRequired) putStrLn(s"Ended with $estimate")
else keepEstimating(accuracyRequired, inside, total)
(Ref.make(1L) zip Ref.make(1L)).flatMap { case (in, tot) =>
(printEstimation(in, tot) *> ZIO.sleep(100.millis)).forever race
ZIO.foreachPar_(1 to 10)(_ => updateEstimate(in, tot)) race
keepEstimating(accuracyRequired = 0.00001, in, tot)
} as 0
object ComputePiJohn extends App {
import zio.random._
import zio.console._
import zio.clock._
import zio.duration._
import zio.stm._
* Some state to keep track of all points inside a circle,
* and total number of points.
final case class PiState(
inside: Ref[Long],
total: Ref[Long]
* A function to estimate pi.
def estimatePi(inside: Long, total: Long): Double =
(inside.toDouble / total.toDouble) * 4.0
* A helper function that determines if a point lies in
* a circle of 1 radius.
def insideCircle(x: Double, y: Double): Boolean =
Math.sqrt(x * x + y * y) <= 1.0
* An effect that computes a random (x, y) point.
val randomPoint: ZIO[Random, Nothing, (Double, Double)] =
nextDouble zip nextDouble
* Build a multi-fiber program that estimates the value of `pi`. Print out
* ongoing estimates continuously until the estimation is complete.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
def insideDelta(point: (Double, Double)): Int =
(if (insideCircle(point._1, point._2)) 1 else 0)
def makeEstimator(piState: PiState) = {
import piState.{ inside, total }
for {
point <- randomPoint
_ <- total.update(_ + 1) *> inside.update(_ + insideDelta(point))
} yield ()
def makeWorker(piState: PiState) = makeEstimator(piState).forever
def makeStatusReporter(piState: PiState) =
(for {
total <-
inside <- piState.inside.get
_ <- putStrLn(estimatePi(inside, total).toString)
_ <- ZIO.sleep(1.second)
} yield ()).forever
for {
_ <- putStrLn("Enter any input to exit...")
piState <- (Ref.make(0L) zipWith Ref.make(0L))(PiState(_, _))
singleWorker = makeWorker(piState)
workers = List.fill(4)(singleWorker)
compositeWorker: URIO[Random, Fiber[Nothing, List[Nothing]]] = ZIO.forkAll(workers)
reporter = makeStatusReporter(piState)
fiber <- (compositeWorker zipWith reporter.fork)(_ zip _)
_ <- getStrLn.orDie *> fiber.interrupt
} yield 0
object StmSwap extends App {
import zio.console._
import zio.stm._
* Demonstrate the following code does not reliably swap two values in the
* presence of concurrency.
def exampleRef = {
def swap[A](ref1: Ref[A], ref2: Ref[A]): UIO[Unit] =
for {
v1 <- ref1.get
v2 <- ref2.get
_ <- ref2.set(v1)
_ <- ref1.set(v2)
} yield ()
for {
ref1 <- Ref.make(100)
ref2 <- Ref.make(0)
fiber1 <- swap(ref1, ref2).repeat(Schedule.recurs(100)).fork
fiber2 <- swap(ref2, ref1).repeat(Schedule.recurs(100)).fork
_ <- (fiber1 zip fiber2).join
value <- (ref1.get zipWith ref2.get)(_ + _)
} yield value
* Using `STM`, implement a safe version of the swap function.
def exampleStm = {
def swap[A](ref1: TRef[A], ref2: TRef[A]): UIO[Unit] =
(for {
one <- ref1.get
two <- ref2.get
_ <- ref2.set(two)
_ <- ref1.set(one)
} yield ()).commit
for {
ref1 <- TRef.make(100).commit
ref2 <- TRef.make(0).commit
fiber1 <- swap(ref1, ref2).repeat(Schedule.recurs(100)).fork
fiber2 <- swap(ref2, ref1).repeat(Schedule.recurs(100)).fork
_ <- (fiber1 zip fiber2).join
value <- (ref1.get zipWith ref2.get)(_ + _).commit
} yield value
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = as 0
object StmLock extends App {
import zio.console._
import zio.stm._
* Using STM, implement a simple binary lock by implementing the creation,
* acquisition, and release methods.
class Lock private (tref: TRef[Boolean]) {
def acquire: UIO[Unit] = (tref.get.filter(_ == false) *> tref.set(true)).commit
def release: UIO[Unit] = tref.set(false).commit
object Lock {
def make: UIO[Lock] = TRef.make(false).map(new Lock(_)).commit
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
lock <- Lock.make
fiber1 <- lock.acquire
.bracket_(lock.release)(putStrLn("Bob : I have the lock!"))
fiber2 <- lock.acquire
.bracket_(lock.release)(putStrLn("Sarah: I have the lock!"))
_ <- (fiber1 zip fiber2).join
} yield 0) as 1
object StmQueue extends App {
import zio.console._
import zio.stm._
import scala.collection.immutable.{ Queue => ScalaQueue }
* Using STM, implement a async queue with double back-pressuring.
class Queue[A] private (capacity: Int, queue: TRef[ScalaQueue[A]]) {
def take: UIO[A] =
(for {
q <- queue.get.filter(_.nonEmpty)
(head, rest) = q.dequeue
_ <- queue.set(rest)
} yield head).commit
def offer(a: A): UIO[Unit] =
(for {
q <- queue.get
_ <- STM.check(q.length <= capacity)
_ <- queue.set(q.enqueue(a))
} yield ()).commit
object Queue {
def make[A]: UIO[Queue[A]] = TRef.make(ScalaQueue.empty[A]).map(q => new Queue(capacity = 10, q)).commit
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
for {
queue <- Queue.make[Int]
_ <- ZIO.foreach(0 to 100)(i => queue.offer(i)).fork
_ <- ZIO.foreach(0 to 100)(_ => queue.take.flatMap(i => putStrLn(s"Got: ${i}")))
} yield 0
object StmLunchTime extends App {
import zio.console._
import zio.stm._
* Using STM, implement the missing methods of Attendee.
final case class Attendee(state: TRef[Attendee.State]) {
import Attendee.State._
def isStarving: STM[Nothing, Boolean] = ???
def feed: STM[Nothing, Unit] = ???
object Attendee {
sealed trait State
object State {
case object Starving extends State
case object Full extends State
* Using STM, implement the missing methods of Table.
final case class Table(seats: TArray[Boolean]) {
def findEmptySeat: STM[Nothing, Option[Int]] =
.fold[(Int, Option[Int])]((0, None)) {
case ((index, z @ Some(_)), _) => (index + 1, z)
case ((index, None), taken) =>
(index + 1, if (taken) None else Some(index))
def takeSeat(index: Int): STM[Nothing, Unit] = ???
def vacateSeat(index: Int): STM[Nothing, Unit] = ???
* Using STM, implement a method that feeds a single attendee.
def feedAttendee(t: Table, a: Attendee): STM[Nothing, Unit] =
for {
index <- t.findEmptySeat.collect { case Some(index) => index }
_ <- t.takeSeat(index) *> a.feed *> t.vacateSeat(index)
} yield ()
* Using STM, implement a method that feeds only the starving attendees.
def feedStarving(table: Table, list: List[Attendee]): UIO[Unit] =
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
val Attendees = 100
val TableSize = 5
for {
attendees <- ZIO.foreach(0 to Attendees)(
i =>
table <- TArray
_ <- feedStarving(table, attendees)
} yield 0
object StmPriorityQueue extends App {
import zio.console._
import zio.stm._
import zio.duration._
* Using STM, design a priority queue, where smaller integers are assumed
* to have higher priority than greater integers.
class PriorityQueue[A] private (
minLevel: TRef[Int],
map: TMap[Int, TQueue[A]]
) {
def offer(a: A, priority: Int): STM[Nothing, Unit] = for {
min <- minLevel.get
_ <- if (priority < min) minLevel.set(priority)
else STM.unit
optQ <- map.get(priority)
q <-
_ <- q.offer(a)
_ <- map.put(priority, q)
} yield ()
def cleanup(min: Int): STM[Nothing, Unit] =
map.delete(min) *> findAndSetNextMinimum
def findAndSetNextMinimum: STM[Nothing, Unit] =
def take: STM[Nothing, A] = {
for {
min <- minLevel.get
q <- map.get(min).collect { case Some(tq) => tq }
a <- q.take
size <- q.size
_ <- if (size == 0) cleanup(min)
else STM.unit
} yield a
object PriorityQueue {
def make[A]: STM[Nothing, PriorityQueue[A]] = for {
minLevel <- TRef.make[Int](Int.MaxValue)
map <- TMap.empty[Int, TQueue[A]]
} yield new PriorityQueue(minLevel, map)
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
_ <- putStrLn("Enter any key to exit...")
queue <- PriorityQueue.make[String].commit
lowPriority = ZIO.foreach(0 to 100) { i =>
ZIO.sleep(1.millis) *> queue
.offer(s"Offer: ${i} with priority 3", 3)
highPriority = ZIO.foreach(0 to 100) { i =>
ZIO.sleep(2.millis) *> queue
.offer(s"Offer: ${i} with priority 0", 0)
_ <- ZIO.forkAll(List(lowPriority, highPriority)) *> queue.take.commit
.fork *>
} yield 0).fold(_ => 1, _ => 0)
object StmReentrantLock extends App {
import zio.console._
import zio.stm._
private final case class WriteLock(
writeCount: Int,
readCount: Int,
fiberId: FiberId
private final class ReadLock private (readers: Map[Fiber.Id, Int]) {
def total: Int = readers.values.sum
def noOtherHolder(fiberId: FiberId): Boolean =
readers.size == 0 || (readers.size == 1 && readers.contains(fiberId))
def readLocks(fiberId: FiberId): Int =
def adjust(fiberId: FiberId, adjust: Int): ReadLock = {
val total = readLocks(fiberId)
val newTotal = total + adjust
new ReadLock(
readers =
if (newTotal == 0) readers - fiberId
else readers.updated(fiberId, newTotal)
private object ReadLock {
val empty: ReadLock = new ReadLock(Map())
def apply(fiberId: Fiber.Id, count: Int): ReadLock =
if (count <= 0) empty else new ReadLock(Map(fiberId -> count))
* Using STM, implement a reentrant read/write lock.
class ReentrantReadWriteLock(data: TRef[Either[ReadLock, WriteLock]]) {
def writeLocks: UIO[Int] = => 0, _.writeCount)).commit
def writeLocked: UIO[Boolean] = > 0)
def readLocks: UIO[Int] =, _.readCount)).commit
def readLocked: UIO[Boolean] = > 0)
val read: Managed[Nothing, Int] = ???
val write: Managed[Nothing, Int] = ???
object ReentrantReadWriteLock {
def make: UIO[ReentrantReadWriteLock] =
.make[Either[ReadLock, WriteLock]](Left(ReadLock.empty))
.map(tref => new ReentrantReadWriteLock(tref))
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = ???
object StmDiningPhilosophers extends App {
import zio.console._
import zio.stm._
sealed trait Fork
val Fork = new Fork {}
final case class Placement(left: TRef[Option[Fork]], right: TRef[Option[Fork]])
final case class Roundtable(seats: Vector[Placement])
* Using STM, implement the logic of a philosopher to take not one fork, but
* both forks when they are both available.
def takeForks(left: TRef[Option[Fork]], right: TRef[Option[Fork]]): STM[Nothing, (Fork, Fork)] =
for {
l <- left.get.collect { case Some(f) => f }
r <- right.get.collect { case Some(f) => f }
} yield (l, r)
def putForks(left: TRef[Option[Fork]], right: TRef[Option[Fork]])(tuple: (Fork, Fork)) = {
val (leftFork, rightFork) = tuple
right.set(Some(rightFork)) *> left.set(Some(leftFork))
def setupTable(size: Int): ZIO[Any, Nothing, Roundtable] = {
val makeFork = TRef.make[Option[Fork]](Some(Fork))
(for {
allForks0 <- STM.foreach(0 to size) { i => makeFork }
allForks = allForks0 ++ List(allForks0(0))
placements = (allForks zip allForks.drop(1)).map { case (l, r) => Placement(l, r) }
} yield Roundtable(placements.toVector)).commit
def eat(philosopher: Int, roundtable: Roundtable): ZIO[Console, Nothing, Unit] = {
val placement = roundtable.seats(philosopher)
val left = placement.left
val right = placement.right
for {
forks <- takeForks(left, right).commit
_ <- putStrLn(s"Philosopher ${philosopher} eating...")
_ <- putForks(left, right)(forks).commit
_ <- putStrLn(s"Philosopher ${philosopher} is done eating")
} yield ()
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
val count = 10
def eaters(table: Roundtable): Iterable[ZIO[Console, Nothing, Unit]] =
(0 to count).map { index => eat(index, table) }
for {
table <- setupTable(count)
fiber <- ZIO.forkAll(eaters(table))
_ <- fiber.join
_ <- putStrLn("All philosophers have eaten!")
} yield 0
object Interview extends App {
import zio.console._
val questions =
"Where where you born?" ::
"What color are your eyes?" ::
"What is your favorite movie?" ::
"What is your favorite number?" :: Nil
* Using `ZIO.foreach`, iterate over all of the `questions`, and for each
* question, print out the question, and read the answer from the console
* using `getStrLn`, collecting all of the answers into a list.
* Print out the answers when done.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
ZIO.foreach(questions) { q =>
putStrLn(q) *> getStrLn
.flatMap(answers => putStrLn(s"These are all the answers you put in ${answers.mkString(",")}"))
.fold(_ => 1, _ => 0)
object SimpleActor extends App {
import zio.console._
import zio.stm._
sealed trait Command
case object ReadTemperature extends Command
final case class AdjustTemperature(value: Double) extends Command
type TemperatureActor = Command => Task[Double]
* Using ZIO Queue and Promise, implement the logic necessary to create an
* actor as a function from `Command` to `Task[Double]`.
def makeActor(initialTemperature: Double): UIO[TemperatureActor] = {
type Bundle = (Command, Promise[Nothing, Double])
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
val temperatures = (0 to 100).map(_.toDouble)
(for {
actor <- makeActor(0)
_ <- ZIO.foreachPar(temperatures) { temp => actor(AdjustTemperature(temp)) }
temp <- actor(ReadTemperature)
_ <- putStrLn(s"Final temperature is ${temp}")
} yield 0) orElse ZIO.succeed(1)
object ParallelFib extends App {
import zio.console._
def fib(n: Int): UIO[BigInt] =
if (n <= 1) UIO(n)
else UIO.unit.flatMap { _ =>
(fib(n - 1) zipWithPar fib(n - 2))(_ + _)
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
for {
_ <- putStrLn("What number of the fibonacci sequence should we calculate?")
n <- getStrLn.orDie.flatMap(input => ZIO(input.toInt)).eventually
f <- fib(n)
_ <- putStrLn(s"fib(${n}) = ${f}")
} yield 0
object Sharding extends App {
* Create N workers reading from a Queue, if one of them fails, then wait
* for the other ones to process their current item, but terminate all the
* workers.
* Return the first error, or never return, if there is no error.
def shard[R, E, A](
queue: Queue[A],
n: Int,
worker: A => ZIO[R, E, Unit]
): ZIO[R, Nothing, E] = {
val qWorker = queue.take.flatMap(worker).forever
val qWorkers = List.fill(n)(qWorker)
val program =
for {
fiber <- ZIO.forkAll(qWorkers)
list <- fiber.join
nothing <- list.headOption.fold(ZIO.dieMessage(s"You supplied n=$n"))(identity)
} yield nothing
def run(args: List[String]) = ???
object CustomEnvironment extends App {
import zio.console._
type MyFx = Logging with Files
trait Logging {
val logging: Logging.Service
object Logging {
trait Service {
def log(line: String): UIO[Unit]
def log(line: String) = ZIO.accessM[Logging](_.logging.log(line))
trait Files {
val files: Files.Service
object Files {
trait Service {
def read(file: String): IO[IOException, String]
def read(file: String) = ZIO.accessM[Files](
val effect: ZIO[Logging with Files, IOException, Unit] =
(for {
file <-"build.sbt")
_ <- Logging.log(file)
} yield ())
def run(args: List[String]) = {
effect.provide {
new Logging with Files {
override val logging: Logging.Service = new Logging.Service {
override def log(line: String): UIO[Unit] = UIO(println(line))
override val files: Files.Service = new Files.Service {
override def read(file: String): IO[IOException, String] =
}.fold(_ => 1, _ => 0)
object Hangman extends App {
import Dictionary.Dictionary
import zio.console._
import zio.random._
* Implement an effect that gets a single, lower-case character from
* the user.
lazy val getChoice: ZIO[Console, IOException, Char] = {
case h :: Nil if h.isLetterOrDigit => ZIO.succeed(h)
case _ => putStrLn("You did not enter a single valid character") *> getChoice
* Implement an effect that prompts the user for their name, and
* returns it.
lazy val getName: ZIO[Console, IOException, String] = putStrLn("What's your name?") *> getStrLn
* Implement an effect that chooses a random word from the dictionary.
* The dictionary is `Dictionary.Dictionary`.
lazy val chooseWord: ZIO[Random, Nothing, String] =
* Implement the main game loop, which gets choices from the user until
* the game is won or lost.
def gameLoop(oldState: State): ZIO[Console, IOException, Unit] =
for {
c <- getChoice
newState = oldState.addChar(c)
result = analyzeNewInput(oldState, newState, c)
_ <- renderState(newState)
_ <- result match {
case GuessResult.Incorrect => putStrLn(s"Wrong answer ${}, try again!") *> gameLoop(newState)
case GuessResult.Unchanged => putStrLn("You already guessed that, try again!") *> gameLoop(newState)
case GuessResult.Correct => putStrLn("Correct! keep going") *> gameLoop(newState)
case GuessResult.Lost => putStrLn("Sorry! you lost")
case GuessResult.Won => putStrLn("Yay, you win!")
} yield ()
def renderState(state: State): ZIO[Console, Nothing, Unit] = {
* f n c t o
* - - - - - - -
* Guesses: a, z, y, x
val word =
.map(c => if (state.guesses.contains(c)) s" $c " else " ")
val line = List.fill(state.word.length)(" - ").mkString("")
val guesses = " Guesses: " + state.guesses.mkString(", ")
val text = word + "\n" + line + "\n\n" + guesses + "\n"
final case class State(name: String, guesses: Set[Char], word: String) {
final def failures: Int = (guesses -- word.toSet).size
final def playerLost: Boolean = failures > 10
final def playerWon: Boolean = (word.toSet -- guesses).isEmpty
final def addChar(char: Char): State = copy(guesses = guesses + char)
sealed trait GuessResult
object GuessResult {
case object Won extends GuessResult
case object Lost extends GuessResult
case object Correct extends GuessResult
case object Incorrect extends GuessResult
case object Unchanged extends GuessResult
def analyzeNewInput(oldState: State, newState: State, char: Char): GuessResult =
if (oldState.guesses.contains(char)) GuessResult.Unchanged
else if (newState.playerWon) GuessResult.Won
else if (newState.playerLost) GuessResult.Lost
else if (oldState.word.contains(char)) GuessResult.Correct
else GuessResult.Incorrect
* Implement hangman using `Dictionary.Dictionary` for the words,
* and the above helper functions.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
name <- getName
word <- chooseWord
state = State(name, Set(), word)
_ <- renderState(state)
_ <- gameLoop(state)
} yield 0) orElse ZIO.succeed(1)
* Implement a game of tic tac toe using ZIO, then develop unit tests to
* demonstrate its correctness and testability.
object TicTacToe extends App {
import zio.console._
sealed trait Mark {
final def renderChar: Char = this match {
case Mark.X => 'X'
case Mark.O => 'O'
final def render: String = renderChar.toString
object Mark {
case object X extends Mark
case object O extends Mark
final case class Board private (value: Vector[Vector[Option[Mark]]]) {
* Retrieves the mark at the specified row/col.
final def get(row: Int, col: Int): Option[Mark] =
* Places a mark on the board at the specified row/col.
final def place(row: Int, col: Int, mark: Mark): Option[Board] =
if (row >= 0 && col >= 0 && row < 3 && col < 3)
copy(value = value.updated(row, value(row).updated(col, Some(mark))))
else None
* Renders the board to a string.
def render: String =
.map(" ")(_.render)).mkString(" ", " | ", " "))
* Returns which mark won the game, if any.
final def won: Option[Mark] =
if (wonBy(Mark.X)) Some(Mark.X)
else if (wonBy(Mark.O)) Some(Mark.O)
else None
private final def wonBy(mark: Mark): Boolean =
wonBy(0, 0, 1, 1, mark) ||
wonBy(0, 2, 1, -1, mark) ||
wonBy(0, 0, 0, 1, mark) ||
wonBy(1, 0, 0, 1, mark) ||
wonBy(2, 0, 0, 1, mark) ||
wonBy(0, 0, 1, 0, mark) ||
wonBy(0, 1, 1, 0, mark) ||
wonBy(0, 2, 1, 0, mark)
private final def wonBy(
row0: Int,
col0: Int,
rowInc: Int,
colInc: Int,
mark: Mark
): Boolean =
extractLine(row0, col0, rowInc, colInc).collect { case Some(v) => v }.toList == List
private final def extractLine(
row0: Int,
col0: Int,
rowInc: Int,
colInc: Int
): Iterable[Option[Mark]] =
for {
row <- (row0 to (row0 + rowInc * 2))
col <- (col0 to (col0 + colInc * 2))
} yield value(row)(col)
object Board {
final val empty = new Board(Vector.fill(3)(Vector.fill(3)(None)))
def fromChars(
first: Iterable[Char],
second: Iterable[Char],
third: Iterable[Char]
): Option[Board] =
if (first.size != 3 || second.size != 3 || third.size != 3) None
else {
def toMark(char: Char): Option[Mark] =
if (char.toLower == 'x') Some(Mark.X)
else if (char.toLower == 'o') Some(Mark.O)
else None
new Board(
val TestBoard = Board
List(' ', 'O', 'X'),
List('O', 'X', 'O'),
List('X', ' ', ' ')
* The entry point to the game will be here.
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn(TestBoard) as 0
