Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Created February 1, 2020 05:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save calvinlfer/09fea316613c6ace32ae79d62b1acc71 to your computer and use it in GitHub Desktop.
Save calvinlfer/09fea316613c6ace32ae79d62b1acc71 to your computer and use it in GitHub Desktop.
Day 1 and 2 ZIO async and concurrent workshop
package net.degoes.zio
import java.io.IOException
import zio._
import java.text.NumberFormat
import java.nio.charset.StandardCharsets
import zio.blocking.Blocking
import zio.console.Console
object ZIOTypes {
type ??? = Nothing
// ZIO[R, E, A]
/**
* EXERCISE
*
* Provide definitions for the ZIO type aliases below.
*/
type Task[+A] = ZIO[Any, Throwable, A]
type UIO[+A] = ZIO[Any, Nothing, A]
type RIO[-R, +A] = ZIO[R, Throwable, A]
type IO[+E, +A] = ZIO[Any, E, A]
type URIO[-R, +A] = ZIO[R, Nothing, A]
}
object HelloWorld extends App {
import zio.console._
/**
* EXERCISE
*
* Implement a simple "Hello World!" program using the effect returned by `putStrLn`.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("Hello world!").as(0)
}
object PrintSequence extends App {
import zio.console._
/**
* EXERCISE
*
* Using `*>` (`zipRight`), compose a sequence of `putStrLn` effects to
* produce an effect that prints three lines of text to the console.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("Hello") *> putStrLn("ZIO") *> putStrLn("World!") as 0
}
object ErrorRecovery extends App {
val StdInputFailed = 1
import zio.console._
val failed =
putStrLn("About to fail...") *>
ZIO.fail("Uh oh!") *>
putStrLn("This will NEVER be printed!")
/**
* EXERCISE
*
* Using `ZIO#orElse` or `ZIO#fold`, have the `run` function compose the
* preceding `failed` effect into the effect that `run` returns.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
failed.orDieWith(s => new Error(s)) as 0
//failed.catchAll(error => putStrLn(error).as(1)) as 0
//(failed as 0) orElse ZIO.succeed(1)
//failed.fold(success = _ => 1, failure = _ => 0)
}
object Looping extends App {
import zio.console._
/**
* EXERCISE
*
* Implement a `repeat` combinator using `flatMap` and recursion.
*/
def repeat[R, E, A](n: Int)(effect: ZIO[R, E, A]): ZIO[R, E, A] =
if (n <= 1) effect
else effect *> repeat(n - 1)(effect)
// else effect.flatMap(_ => repeat(n - 1)(effect))
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
repeat(3)(putStrLn("All work and no play makes Jack a dull boy")) as 0
}
object EffectConversion extends App {
/**
* EXERCISE
*
* Using ZIO.effect, convert the side-effecting of `println` into a pure
* functional effect.
*/
def myPrintLn(line: String): Task[Unit] =
// converts exceptions into ZIO.fail
ZIO.effect(println(line))
def run(args: List[String]) =
myPrintLn("foo").fold(_ => 1, _ => 0)
}
object ErrorNarrowing extends App {
import java.io.IOException
import scala.io.StdIn.readLine
implicit class Unimplemented[A](v: A) {
def ? = ???
}
/**
* EXERCISE
*
* Using `ZIO#refineToOrDie`, narrow the error type of the following
* effect to IOException.
*/
val myReadLine: IO[IOException, String] =
// convert Exception to IOException and put it on the typed channel,
// if any other errors then die (unrecoverable error)
ZIO.effect(readLine()).refineToOrDie[IOException]
def myPrintLn(line: String): UIO[Unit] = UIO(println(line))
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
_ <- myPrintLn("What is your name?")
name <- myReadLine
_ <- myPrintLn(s"Good to meet you, ${name}")
} yield 0) orElse ZIO.succeed(1)
}
object PromptName extends App {
val StdInputFailed = 1
import zio.console._
/**
* EXERCISE
*
* Using `ZIO#flatMap`, implement a simple program that asks the user for
* their name (using `getStrLn`), and then prints it out to the user (using `putStrLn`).
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("What is your name?") *>
getStrLn
.flatMap(name => putStrLn(s"Hello $name"))
.fold(_ => 1, _ => 0)
}
object NumberGuesser extends App {
import zio.console._
import zio.random._
def analyzeAnswer(random: Int, guess: String) =
if (random.toString == guess.trim) putStrLn("You guessed correctly!")
else putStrLn(s"You did not guess correctly. The answer was ${random}")
/**
* EXERCISE
*
* Choose a random number (using `nextInt`), and then ask the user to guess
* the number, feeding their response to `analyzeAnswer`, above.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn("Pick a random number") *>
(nextInt(3) <*> getStrLn)
.flatMap { case (random, guess) => analyzeAnswer(random + 1, guess) }
.fold(_ => 1, _ => 0)
}
object AlarmApp extends App {
import zio.console._
import zio.duration._
import java.io.IOException
import java.util.concurrent.TimeUnit
/**
* EXERCISE
*
* Create an effect that will get a `Duration` from the user, by prompting
* the user to enter a decimal number of seconds. Use `refineOrDie` to
* narrow the error type as necessary.
*/
lazy val getAlarmDuration: ZIO[Console, IOException, Duration] = {
def parseDuration(input: String): IO[NumberFormatException, Duration] =
ZIO.effect(input.toInt)
.map(_.seconds)
.refineToOrDie[NumberFormatException]
def fallback(input: String): ZIO[Console, IOException, Duration] =
putStrLn(s"You entered $input which is not valid for seconds, try again!") *> getAlarmDuration
for {
_ <- putStrLn("Please enter the number of seconds to sleep: ")
input <- getStrLn
duration <- parseDuration(input) orElse fallback(input)
} yield duration
}
/**
* EXERCISE
*
* Create a program that asks the user for a number of seconds to sleep,
* sleeps the specified number of seconds using ZIO.sleep(d), and then
* prints out a wakeup alarm message, like "Time to wakeup!!!".
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(getAlarmDuration.flatMap(ZIO.sleep) *> putStrLn("Time to wakeup!!!"))
.fold(_ => 1, _ => 0)
}
object Cat extends App {
import zio.console._
import zio.blocking._
import java.io.IOException
/**
* EXERCISE
*
* Implement a function to read a file on the blocking thread pool, storing
* the result into a string.
*/
def readFile(file: String): ZIO[Blocking, IOException, String] =
blocking {
ZIO.effect()
import scala.io.Source
val open: Task[Source] = ZIO.effect(Source.fromFile(file))
val close = (s: Source) => ZIO.effect(s.close()).orDie
// bracket ensures that cleanup will be done if the resource was opened (even in the face of interruption)
open.bracket(close(_)) { source => ZIO.effect(source.mkString) }
.refineToOrDie[IOException]
}
/**
* EXERCISE
*
* Implement a version of the command-line utility "cat", which dumps the
* contents of the specified file to standard output.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
if (args.isEmpty) putStrLn("Supply a file as an argument") as 1
else readFile(args.head).flatMap(putStrLn).fold(_ => 2, _ => 0)
}
import scala.io.Source
class ZioSource private (private val source: Source) extends AnyVal {
import zio.blocking._
import java.io.IOException
def close: ZIO[Blocking, IOException, Unit] = execute(_.close())
def execute[T](f: Source => T): ZIO[Blocking, IOException, T] =
effectBlocking(f(source)).refineToOrDie[IOException]
}
object ZioSource {
import zio.blocking._
import java.io.IOException
def apply(file: String): ZIO[Blocking, IOException, ZioSource] =
effectBlocking(new ZioSource(Source.fromFile(file))).refineToOrDie[IOException]
def resource(file: String): ZManaged[Blocking, IOException, ZioSource] =
ZManaged.make(ZioSource(file))(_.close.orDie)
}
object ReadFilesCleanup extends App {
import zio.console._
// this is not very composable
def readFiles(file1: String, file2: String): ZIO[Console with Blocking, IOException, Unit] =
ZioSource(file1).bracket(_.close.orDie) { source1 =>
ZioSource(file2).bracket(_.close.orDie) { source2 =>
(source1.execute(_.mkString) <*> source2.execute(_.mkString))
.flatMap { case (l, r) => putStrLn(l) *> putStrLn(r) }
}
}
def readFileBetter(file1: String, file2: String) =
(ZioSource.resource(file1) <*> ZioSource.resource(file2))
.use { case (l, r) =>
(l.execute(_.mkString) <*> r.execute(_.mkString))
.flatMap { case (l, r) => putStrLn(l) *> putStrLn(r) }
}
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = ???
}
object SourceManaged extends App {
import zio.console._
import zio.blocking._
import zio.duration._
import java.io.IOException
import scala.io.Source
final class ZioSource private (private val source: Source) {
def execute[T](f: Source => T): ZIO[Blocking, IOException, T] =
effectBlocking(f(source)).refineToOrDie[IOException]
}
object ZioSource {
/**
* EXERCISE
*
* Use the `ZManaged.make` constructor to make a managed data type that
* will automatically acquire and release the resource when it is used.
*/
def make(file: String): ZManaged[Blocking, IOException, ZioSource] = {
// An effect that acquires the resource:
val open = effectBlocking(new ZioSource(Source.fromFile(file))).refineToOrDie[IOException]
// A function that, when given the resource, returns an effect that
// releases the resource:
val close: ZioSource => ZIO[Blocking, Nothing, Unit] =
_.execute(_.close()).orDie
???
}
}
/**
* EXERCISE
*
* Implement a function to read a file on the blocking thread pool, storing
* the result into a string.
*/
def readFiles(files: List[String]): ZIO[Blocking with Console, IOException, Unit] =
???
/**
* EXERCISE
*
* Implement a function that prints out all files specified on the
* command-line. Only print out contents from these files if they
* can all be opened simultaneously. Otherwise, don't print out
* anything except an error message.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
???
}
object CatIncremental extends App {
import zio.console._
import zio.blocking._
import java.io.{IOException, InputStream, FileInputStream}
/**
* BONUS EXERCISE
*
* Implement a `blockingIO` combinator to use in subsequent exercises.
*/
def blockingIO[A](a: => A): ZIO[Blocking, IOException, A] =
ZIO.accessM(_.blocking.effectBlocking(a).refineToOrDie[IOException])
/**
* EXERCISE
*
* Implement all missing methods of `FileHandle`. Be sure to do all work on
* the blocking thread pool.
*/
final case class FileHandle private (private val is: InputStream) {
final private def close: ZIO[Blocking, IOException, Unit] = blockingIO(is.close())
final def read: ZIO[Blocking, IOException, Option[Chunk[Byte]]] =
blockingIO {
val array = Array.ofDim[Byte](1024)
val bytesRead = is.read(array)
if (bytesRead == -1) None
else Some(Chunk.fromArray(array.take(bytesRead)))
}
}
object FileHandle {
final def open(file: String): ZManaged[Blocking, IOException, FileHandle] = {
val acquire = blockingIO {
val inputStream = new FileInputStream(new java.io.File(file))
FileHandle(inputStream)
}
val release = (f: FileHandle) => f.close.orDie
ZManaged.make(acquire)(release)
}
}
def cat(fh: FileHandle): ZIO[Blocking with Console, IOException, Unit] =
fh.read.flatMap {
case None => ZIO.unit
case Some(chunk) => putStr(new String(chunk.toArray, StandardCharsets.UTF_8)) *> cat(fh)
}
/**
* EXERCISE
*
* Implement an incremental version of the `cat` utility, using `ZIO#bracket`
* or `ZManaged` to ensure the file is closed in the event of error or
* interruption.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
args match {
case file :: Nil =>
(FileHandle.open(file).use(cat) as 0) orElse ZIO.succeed(1)
case _ => putStrLn("Usage: cat <file>") as 2
}
}
object AlarmAppImproved extends App {
import zio.console._
import zio.duration._
import java.io.IOException
import java.util.concurrent.TimeUnit
lazy val getAlarmDuration: ZIO[Console, IOException, Duration] = {
def parseDuration(input: String): IO[NumberFormatException, Duration] =
ZIO.effect(Duration((input.toDouble * 1000.0).toLong, TimeUnit.MILLISECONDS))
.refineToOrDie[NumberFormatException]
val fallback = putStrLn("You didn't enter the number of seconds!") *> getAlarmDuration
for {
_ <- putStrLn("Please enter the number of seconds to sleep: ")
input <- getStrLn
duration <- parseDuration(input) orElse fallback
} yield duration
}
/**
* EXERCISE
*
* Create a program that asks the user for a number of seconds to sleep,
* sleeps the specified number of seconds using ZIO.sleep(d), concurrently
* prints a dot every second that the alarm is sleeping for, and then
* prints out a wakeup alarm message, like "Time to wakeup!!!".
*/
def run2(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
sleepFor <- getAlarmDuration
sleepFiber <- ZIO.sleep(sleepFor).fork
printFiber <- (putStrLn(".") *> ZIO.sleep(1.second)).forever.fork
_ <- sleepFiber.join
_ <- printFiber.interrupt
_ <- putStrLn("Time to wake up!")
} yield 0).fold(_ => 1, _ => 0)
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
sleepFor <- getAlarmDuration
sleep = ZIO.sleep(sleepFor)
print = (putStrLn(".") *> ZIO.sleep(1.second)).forever
_ <- sleep race print
_ <- putStrLn("Time to wake up!")
} yield 0).fold(_ => 1, _ => 0)
}
object ComputePi extends App {
import zio.random._
import zio.console._
import zio.clock._
import zio.duration._
import zio.stm._
/**
* Some state to keep track of all points inside a circle,
* and total number of points.
*/
final case class PiState(
inside: Ref[Long],
total: Ref[Long]
)
/**
* A function to estimate pi.
*/
def estimatePi(inside: Long, total: Long): Double =
(inside.toDouble / total.toDouble) * 4.0
/**
* A helper function that determines if a point lies in
* a circle of 1 radius.
*/
def insideCircle(x: Double, y: Double): Boolean =
Math.sqrt(x * x + y * y) <= 1.0
/**
* An effect that computes a random (x, y) point.
*/
val randomPoint: ZIO[Random, Nothing, (Double, Double)] =
nextDouble zip nextDouble
/**
* EXERCISE
*
* Build a multi-fiber program that estimates the value of `pi`. Print out
* ongoing estimates continuously until the estimation is complete.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
def printEstimation(inside: Ref[Long], total: Ref[Long]): URIO[Console, Unit] =
(inside.get zip total.get).flatMap { case (in, tot) => putStrLn(estimatePi(in, tot).toString) }
def updateEstimate(inside: Ref[Long], total: Ref[Long]): URIO[Random, Unit] =
randomPoint.flatMap { case (x, y) =>
val updateTotal = total.update(_ + 1)
val updateInside = inside.update(_ + 1)
if (insideCircle(x, y)) updateInside zipPar updateTotal
else updateTotal
} *> updateEstimate(inside, total)
def keepEstimating(accuracyRequired: Double, inside: Ref[Long], total: Ref[Long]): URIO[Console, Unit] =
(inside.get zip total.get).flatMap { case (in, tot) =>
val estimate = estimatePi(in, tot)
val actual = math.abs(math.Pi - estimate)
if (actual <= accuracyRequired) putStrLn(s"Ended with $estimate")
else keepEstimating(accuracyRequired, inside, total)
}
(Ref.make(1L) zip Ref.make(1L)).flatMap { case (in, tot) =>
(printEstimation(in, tot) *> ZIO.sleep(100.millis)).forever race
ZIO.foreachPar_(1 to 10)(_ => updateEstimate(in, tot)) race
keepEstimating(accuracyRequired = 0.00001, in, tot)
} as 0
}
}
object ComputePiJohn extends App {
import zio.random._
import zio.console._
import zio.clock._
import zio.duration._
import zio.stm._
/**
* Some state to keep track of all points inside a circle,
* and total number of points.
*/
final case class PiState(
inside: Ref[Long],
total: Ref[Long]
)
/**
* A function to estimate pi.
*/
def estimatePi(inside: Long, total: Long): Double =
(inside.toDouble / total.toDouble) * 4.0
/**
* A helper function that determines if a point lies in
* a circle of 1 radius.
*/
def insideCircle(x: Double, y: Double): Boolean =
Math.sqrt(x * x + y * y) <= 1.0
/**
* An effect that computes a random (x, y) point.
*/
val randomPoint: ZIO[Random, Nothing, (Double, Double)] =
nextDouble zip nextDouble
/**
* EXERCISE
*
* Build a multi-fiber program that estimates the value of `pi`. Print out
* ongoing estimates continuously until the estimation is complete.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
def insideDelta(point: (Double, Double)): Int =
(if (insideCircle(point._1, point._2)) 1 else 0)
def makeEstimator(piState: PiState) = {
import piState.{ inside, total }
for {
point <- randomPoint
_ <- total.update(_ + 1) *> inside.update(_ + insideDelta(point))
} yield ()
}
def makeWorker(piState: PiState) = makeEstimator(piState).forever
def makeStatusReporter(piState: PiState) =
(for {
total <- piState.total.get
inside <- piState.inside.get
_ <- putStrLn(estimatePi(inside, total).toString)
_ <- ZIO.sleep(1.second)
} yield ()).forever
for {
_ <- putStrLn("Enter any input to exit...")
piState <- (Ref.make(0L) zipWith Ref.make(0L))(PiState(_, _))
singleWorker = makeWorker(piState)
workers = List.fill(4)(singleWorker)
compositeWorker: URIO[Random, Fiber[Nothing, List[Nothing]]] = ZIO.forkAll(workers)
reporter = makeStatusReporter(piState)
fiber <- (compositeWorker zipWith reporter.fork)(_ zip _)
_ <- getStrLn.orDie *> fiber.interrupt
} yield 0
}
}
object StmSwap extends App {
import zio.console._
import zio.stm._
/**
* EXERCISE
*
* Demonstrate the following code does not reliably swap two values in the
* presence of concurrency.
*/
def exampleRef = {
def swap[A](ref1: Ref[A], ref2: Ref[A]): UIO[Unit] =
for {
v1 <- ref1.get
v2 <- ref2.get
_ <- ref2.set(v1)
_ <- ref1.set(v2)
} yield ()
for {
ref1 <- Ref.make(100)
ref2 <- Ref.make(0)
fiber1 <- swap(ref1, ref2).repeat(Schedule.recurs(100)).fork
fiber2 <- swap(ref2, ref1).repeat(Schedule.recurs(100)).fork
_ <- (fiber1 zip fiber2).join
value <- (ref1.get zipWith ref2.get)(_ + _)
} yield value
}
/**
* EXERCISE
*
* Using `STM`, implement a safe version of the swap function.
*/
def exampleStm = {
def swap[A](ref1: TRef[A], ref2: TRef[A]): UIO[Unit] =
(for {
one <- ref1.get
two <- ref2.get
_ <- ref2.set(two)
_ <- ref1.set(one)
} yield ()).commit
for {
ref1 <- TRef.make(100).commit
ref2 <- TRef.make(0).commit
fiber1 <- swap(ref1, ref2).repeat(Schedule.recurs(100)).fork
fiber2 <- swap(ref2, ref1).repeat(Schedule.recurs(100)).fork
_ <- (fiber1 zip fiber2).join
value <- (ref1.get zipWith ref2.get)(_ + _).commit
} yield value
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
exampleStm.map(_.toString).flatMap(putStrLn) as 0
}
object StmLock extends App {
import zio.console._
import zio.stm._
/**
* EXERCISE
*
* Using STM, implement a simple binary lock by implementing the creation,
* acquisition, and release methods.
*/
class Lock private (tref: TRef[Boolean]) {
def acquire: UIO[Unit] = (tref.get.filter(_ == false) *> tref.set(true)).commit
def release: UIO[Unit] = tref.set(false).commit
}
object Lock {
def make: UIO[Lock] = TRef.make(false).map(new Lock(_)).commit
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
lock <- Lock.make
fiber1 <- lock.acquire
.bracket_(lock.release)(putStrLn("Bob : I have the lock!"))
.repeat(Schedule.recurs(10))
.fork
fiber2 <- lock.acquire
.bracket_(lock.release)(putStrLn("Sarah: I have the lock!"))
.repeat(Schedule.recurs(10))
.fork
_ <- (fiber1 zip fiber2).join
} yield 0) as 1
}
object StmQueue extends App {
import zio.console._
import zio.stm._
import scala.collection.immutable.{ Queue => ScalaQueue }
/**
* EXERCISE
*
* Using STM, implement a async queue with double back-pressuring.
*/
class Queue[A] private (capacity: Int, queue: TRef[ScalaQueue[A]]) {
def take: UIO[A] =
(for {
q <- queue.get.filter(_.nonEmpty)
(head, rest) = q.dequeue
_ <- queue.set(rest)
} yield head).commit
def offer(a: A): UIO[Unit] =
(for {
q <- queue.get
_ <- STM.check(q.length <= capacity)
_ <- queue.set(q.enqueue(a))
} yield ()).commit
}
object Queue {
def make[A]: UIO[Queue[A]] = TRef.make(ScalaQueue.empty[A]).map(q => new Queue(capacity = 10, q)).commit
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
for {
queue <- Queue.make[Int]
_ <- ZIO.foreach(0 to 100)(i => queue.offer(i)).fork
_ <- ZIO.foreach(0 to 100)(_ => queue.take.flatMap(i => putStrLn(s"Got: ${i}")))
} yield 0
}
object StmLunchTime extends App {
import zio.console._
import zio.stm._
/**
* EXERCISE
*
* Using STM, implement the missing methods of Attendee.
*/
final case class Attendee(state: TRef[Attendee.State]) {
import Attendee.State._
def isStarving: STM[Nothing, Boolean] = ???
def feed: STM[Nothing, Unit] = ???
}
object Attendee {
sealed trait State
object State {
case object Starving extends State
case object Full extends State
}
}
/**
* EXERCISE
*
* Using STM, implement the missing methods of Table.
*/
final case class Table(seats: TArray[Boolean]) {
def findEmptySeat: STM[Nothing, Option[Int]] =
seats
.fold[(Int, Option[Int])]((0, None)) {
case ((index, z @ Some(_)), _) => (index + 1, z)
case ((index, None), taken) =>
(index + 1, if (taken) None else Some(index))
}
.map(_._2)
def takeSeat(index: Int): STM[Nothing, Unit] = ???
def vacateSeat(index: Int): STM[Nothing, Unit] = ???
}
/**
* EXERCISE
*
* Using STM, implement a method that feeds a single attendee.
*/
def feedAttendee(t: Table, a: Attendee): STM[Nothing, Unit] =
for {
index <- t.findEmptySeat.collect { case Some(index) => index }
_ <- t.takeSeat(index) *> a.feed *> t.vacateSeat(index)
} yield ()
/**
* EXERCISE
*
* Using STM, implement a method that feeds only the starving attendees.
*/
def feedStarving(table: Table, list: List[Attendee]): UIO[Unit] =
???
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
val Attendees = 100
val TableSize = 5
for {
attendees <- ZIO.foreach(0 to Attendees)(
i =>
TRef
.make[Attendee.State](Attendee.State.Starving)
.map(Attendee(_))
.commit
)
table <- TArray
.fromIterable(List.fill(TableSize)(false))
.map(Table(_))
.commit
_ <- feedStarving(table, attendees)
} yield 0
}
}
object StmPriorityQueue extends App {
import zio.console._
import zio.stm._
import zio.duration._
/**
* EXERCISE
*
* Using STM, design a priority queue, where smaller integers are assumed
* to have higher priority than greater integers.
*/
class PriorityQueue[A] private (
minLevel: TRef[Int],
map: TMap[Int, TQueue[A]]
) {
def offer(a: A, priority: Int): STM[Nothing, Unit] = for {
min <- minLevel.get
_ <- if (priority < min) minLevel.set(priority)
else STM.unit
optQ <- map.get(priority)
q <- optQ.map(STM.succeed).getOrElse(TQueue.make(Int.MaxValue))
_ <- q.offer(a)
_ <- map.put(priority, q)
} yield ()
def cleanup(min: Int): STM[Nothing, Unit] =
map.delete(min) *> findAndSetNextMinimum
def findAndSetNextMinimum: STM[Nothing, Unit] =
map.keys.map(_.sorted.headOption)
.map(_.fold(Int.MaxValue)(identity))
.flatMap(minLevel.set)
def take: STM[Nothing, A] = {
for {
min <- minLevel.get
q <- map.get(min).collect { case Some(tq) => tq }
a <- q.take
size <- q.size
_ <- if (size == 0) cleanup(min)
else STM.unit
} yield a
}
}
object PriorityQueue {
def make[A]: STM[Nothing, PriorityQueue[A]] = for {
minLevel <- TRef.make[Int](Int.MaxValue)
map <- TMap.empty[Int, TQueue[A]]
} yield new PriorityQueue(minLevel, map)
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
_ <- putStrLn("Enter any key to exit...")
queue <- PriorityQueue.make[String].commit
lowPriority = ZIO.foreach(0 to 100) { i =>
ZIO.sleep(1.millis) *> queue
.offer(s"Offer: ${i} with priority 3", 3)
.commit
}
highPriority = ZIO.foreach(0 to 100) { i =>
ZIO.sleep(2.millis) *> queue
.offer(s"Offer: ${i} with priority 0", 0)
.commit
}
_ <- ZIO.forkAll(List(lowPriority, highPriority)) *> queue.take.commit
.flatMap(putStrLn(_))
.forever
.fork *>
getStrLn
} yield 0).fold(_ => 1, _ => 0)
}
object StmReentrantLock extends App {
import zio.console._
import zio.stm._
private final case class WriteLock(
writeCount: Int,
readCount: Int,
fiberId: FiberId
)
private final class ReadLock private (readers: Map[Fiber.Id, Int]) {
def total: Int = readers.values.sum
def noOtherHolder(fiberId: FiberId): Boolean =
readers.size == 0 || (readers.size == 1 && readers.contains(fiberId))
def readLocks(fiberId: FiberId): Int =
readers.get(fiberId).fold(0)(identity)
def adjust(fiberId: FiberId, adjust: Int): ReadLock = {
val total = readLocks(fiberId)
val newTotal = total + adjust
new ReadLock(
readers =
if (newTotal == 0) readers - fiberId
else readers.updated(fiberId, newTotal)
)
}
}
private object ReadLock {
val empty: ReadLock = new ReadLock(Map())
def apply(fiberId: Fiber.Id, count: Int): ReadLock =
if (count <= 0) empty else new ReadLock(Map(fiberId -> count))
}
/**
* EXERCISE
*
* Using STM, implement a reentrant read/write lock.
*/
class ReentrantReadWriteLock(data: TRef[Either[ReadLock, WriteLock]]) {
def writeLocks: UIO[Int] = data.get.map(_.fold(_ => 0, _.writeCount)).commit
def writeLocked: UIO[Boolean] = writeLocks.map(_ > 0)
def readLocks: UIO[Int] = data.get.map(_.fold(_.total, _.readCount)).commit
def readLocked: UIO[Boolean] = readLocks.map(_ > 0)
val read: Managed[Nothing, Int] = ???
val write: Managed[Nothing, Int] = ???
}
object ReentrantReadWriteLock {
def make: UIO[ReentrantReadWriteLock] =
TRef
.make[Either[ReadLock, WriteLock]](Left(ReadLock.empty))
.map(tref => new ReentrantReadWriteLock(tref))
.commit
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = ???
}
object StmDiningPhilosophers extends App {
import zio.console._
import zio.stm._
sealed trait Fork
val Fork = new Fork {}
final case class Placement(left: TRef[Option[Fork]], right: TRef[Option[Fork]])
final case class Roundtable(seats: Vector[Placement])
/**
* EXERCISE
*
* Using STM, implement the logic of a philosopher to take not one fork, but
* both forks when they are both available.
*/
def takeForks(left: TRef[Option[Fork]], right: TRef[Option[Fork]]): STM[Nothing, (Fork, Fork)] =
for {
l <- left.get.collect { case Some(f) => f }
r <- right.get.collect { case Some(f) => f }
} yield (l, r)
def putForks(left: TRef[Option[Fork]], right: TRef[Option[Fork]])(tuple: (Fork, Fork)) = {
val (leftFork, rightFork) = tuple
right.set(Some(rightFork)) *> left.set(Some(leftFork))
}
def setupTable(size: Int): ZIO[Any, Nothing, Roundtable] = {
val makeFork = TRef.make[Option[Fork]](Some(Fork))
(for {
allForks0 <- STM.foreach(0 to size) { i => makeFork }
allForks = allForks0 ++ List(allForks0(0))
placements = (allForks zip allForks.drop(1)).map { case (l, r) => Placement(l, r) }
} yield Roundtable(placements.toVector)).commit
}
def eat(philosopher: Int, roundtable: Roundtable): ZIO[Console, Nothing, Unit] = {
val placement = roundtable.seats(philosopher)
val left = placement.left
val right = placement.right
for {
forks <- takeForks(left, right).commit
_ <- putStrLn(s"Philosopher ${philosopher} eating...")
_ <- putForks(left, right)(forks).commit
_ <- putStrLn(s"Philosopher ${philosopher} is done eating")
} yield ()
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
val count = 10
def eaters(table: Roundtable): Iterable[ZIO[Console, Nothing, Unit]] =
(0 to count).map { index => eat(index, table) }
for {
table <- setupTable(count)
fiber <- ZIO.forkAll(eaters(table))
_ <- fiber.join
_ <- putStrLn("All philosophers have eaten!")
} yield 0
}
}
object Interview extends App {
import zio.console._
val questions =
"Where where you born?" ::
"What color are your eyes?" ::
"What is your favorite movie?" ::
"What is your favorite number?" :: Nil
/**
* EXERCISE
*
* Using `ZIO.foreach`, iterate over all of the `questions`, and for each
* question, print out the question, and read the answer from the console
* using `getStrLn`, collecting all of the answers into a list.
*
* Print out the answers when done.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
ZIO.foreach(questions) { q =>
putStrLn(q) *> getStrLn
}
.flatMap(answers => putStrLn(s"These are all the answers you put in ${answers.mkString(",")}"))
.fold(_ => 1, _ => 0)
}
object SimpleActor extends App {
import zio.console._
import zio.stm._
sealed trait Command
case object ReadTemperature extends Command
final case class AdjustTemperature(value: Double) extends Command
type TemperatureActor = Command => Task[Double]
/**
* EXERCISE
*
* Using ZIO Queue and Promise, implement the logic necessary to create an
* actor as a function from `Command` to `Task[Double]`.
*/
def makeActor(initialTemperature: Double): UIO[TemperatureActor] = {
type Bundle = (Command, Promise[Nothing, Double])
???
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
val temperatures = (0 to 100).map(_.toDouble)
(for {
actor <- makeActor(0)
_ <- ZIO.foreachPar(temperatures) { temp => actor(AdjustTemperature(temp)) }
temp <- actor(ReadTemperature)
_ <- putStrLn(s"Final temperature is ${temp}")
} yield 0) orElse ZIO.succeed(1)
}
}
object ParallelFib extends App {
import zio.console._
def fib(n: Int): UIO[BigInt] =
if (n <= 1) UIO(n)
else UIO.unit.flatMap { _ =>
(fib(n - 1) zipWithPar fib(n - 2))(_ + _)
}
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
for {
_ <- putStrLn("What number of the fibonacci sequence should we calculate?")
n <- getStrLn.orDie.flatMap(input => ZIO(input.toInt)).eventually
f <- fib(n)
_ <- putStrLn(s"fib(${n}) = ${f}")
} yield 0
}
object Sharding extends App {
/**
* EXERCISE
*
* Create N workers reading from a Queue, if one of them fails, then wait
* for the other ones to process their current item, but terminate all the
* workers.
*
* Return the first error, or never return, if there is no error.
*/
def shard[R, E, A](
queue: Queue[A],
n: Int,
worker: A => ZIO[R, E, Unit]
): ZIO[R, Nothing, E] = {
val qWorker = queue.take.flatMap(worker).forever
val qWorkers = List.fill(n)(qWorker)
val program =
for {
fiber <- ZIO.forkAll(qWorkers)
list <- fiber.join
nothing <- list.headOption.fold(ZIO.dieMessage(s"You supplied n=$n"))(identity)
} yield nothing
program.flip
}
def run(args: List[String]) = ???
}
object CustomEnvironment extends App {
import zio.console._
import java.io.IOException
type MyFx = Logging with Files
trait Logging {
val logging: Logging.Service
}
object Logging {
trait Service {
def log(line: String): UIO[Unit]
}
def log(line: String) = ZIO.accessM[Logging](_.logging.log(line))
}
trait Files {
val files: Files.Service
}
object Files {
trait Service {
def read(file: String): IO[IOException, String]
}
def read(file: String) = ZIO.accessM[Files](_.files.read(file))
}
val effect: ZIO[Logging with Files, IOException, Unit] =
(for {
file <- Files.read("build.sbt")
_ <- Logging.log(file)
} yield ())
def run(args: List[String]) = {
effect.provide {
new Logging with Files {
override val logging: Logging.Service = new Logging.Service {
override def log(line: String): UIO[Unit] = UIO(println(line))
}
override val files: Files.Service = new Files.Service {
override def read(file: String): IO[IOException, String] =
Task(scala.io.Source.fromFile(file).mkString).refineToOrDie[IOException]
}
}
}.fold(_ => 1, _ => 0)
}
}
object Hangman extends App {
import Dictionary.Dictionary
import zio.console._
import zio.random._
import java.io.IOException
/**
* EXERCISE
*
* Implement an effect that gets a single, lower-case character from
* the user.
*/
lazy val getChoice: ZIO[Console, IOException, Char] =
getStrLn.map(_.trim.toLowerCase().toList).flatMap {
case h :: Nil if h.isLetterOrDigit => ZIO.succeed(h)
case _ => putStrLn("You did not enter a single valid character") *> getChoice
}
/**
* EXERCISE
*
* Implement an effect that prompts the user for their name, and
* returns it.
*/
lazy val getName: ZIO[Console, IOException, String] = putStrLn("What's your name?") *> getStrLn
/**
* EXERCISE
*
* Implement an effect that chooses a random word from the dictionary.
* The dictionary is `Dictionary.Dictionary`.
*/
lazy val chooseWord: ZIO[Random, Nothing, String] =
nextInt(Dictionary.length).map(Dictionary)
/**
* EXERCISE
*
* Implement the main game loop, which gets choices from the user until
* the game is won or lost.
*/
def gameLoop(oldState: State): ZIO[Console, IOException, Unit] =
for {
c <- getChoice
newState = oldState.addChar(c)
result = analyzeNewInput(oldState, newState, c)
_ <- renderState(newState)
_ <- result match {
case GuessResult.Incorrect => putStrLn(s"Wrong answer ${newState.name}, try again!") *> gameLoop(newState)
case GuessResult.Unchanged => putStrLn("You already guessed that, try again!") *> gameLoop(newState)
case GuessResult.Correct => putStrLn("Correct! keep going") *> gameLoop(newState)
case GuessResult.Lost => putStrLn("Sorry! you lost")
case GuessResult.Won => putStrLn("Yay, you win!")
}
} yield ()
def renderState(state: State): ZIO[Console, Nothing, Unit] = {
/**
*
* f n c t o
* - - - - - - -
*
* Guesses: a, z, y, x
*
*/
val word =
state.word.toList
.map(c => if (state.guesses.contains(c)) s" $c " else " ")
.mkString("")
val line = List.fill(state.word.length)(" - ").mkString("")
val guesses = " Guesses: " + state.guesses.mkString(", ")
val text = word + "\n" + line + "\n\n" + guesses + "\n"
putStrLn(text)
}
final case class State(name: String, guesses: Set[Char], word: String) {
final def failures: Int = (guesses -- word.toSet).size
final def playerLost: Boolean = failures > 10
final def playerWon: Boolean = (word.toSet -- guesses).isEmpty
final def addChar(char: Char): State = copy(guesses = guesses + char)
}
sealed trait GuessResult
object GuessResult {
case object Won extends GuessResult
case object Lost extends GuessResult
case object Correct extends GuessResult
case object Incorrect extends GuessResult
case object Unchanged extends GuessResult
}
def analyzeNewInput(oldState: State, newState: State, char: Char): GuessResult =
if (oldState.guesses.contains(char)) GuessResult.Unchanged
else if (newState.playerWon) GuessResult.Won
else if (newState.playerLost) GuessResult.Lost
else if (oldState.word.contains(char)) GuessResult.Correct
else GuessResult.Incorrect
/**
* EXERCISE
*
* Implement hangman using `Dictionary.Dictionary` for the words,
* and the above helper functions.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
(for {
name <- getName
word <- chooseWord
state = State(name, Set(), word)
_ <- renderState(state)
_ <- gameLoop(state)
} yield 0) orElse ZIO.succeed(1)
}
/**
* GRADUATION PROJECT
*
* Implement a game of tic tac toe using ZIO, then develop unit tests to
* demonstrate its correctness and testability.
*/
object TicTacToe extends App {
import zio.console._
sealed trait Mark {
final def renderChar: Char = this match {
case Mark.X => 'X'
case Mark.O => 'O'
}
final def render: String = renderChar.toString
}
object Mark {
case object X extends Mark
case object O extends Mark
}
final case class Board private (value: Vector[Vector[Option[Mark]]]) {
/**
* Retrieves the mark at the specified row/col.
*/
final def get(row: Int, col: Int): Option[Mark] =
value.lift(row).flatMap(_.lift(col)).flatten
/**
* Places a mark on the board at the specified row/col.
*/
final def place(row: Int, col: Int, mark: Mark): Option[Board] =
if (row >= 0 && col >= 0 && row < 3 && col < 3)
Some(
copy(value = value.updated(row, value(row).updated(col, Some(mark))))
)
else None
/**
* Renders the board to a string.
*/
def render: String =
value
.map(_.map(_.fold(" ")(_.render)).mkString(" ", " | ", " "))
.mkString("\n---|---|---\n")
/**
* Returns which mark won the game, if any.
*/
final def won: Option[Mark] =
if (wonBy(Mark.X)) Some(Mark.X)
else if (wonBy(Mark.O)) Some(Mark.O)
else None
private final def wonBy(mark: Mark): Boolean =
wonBy(0, 0, 1, 1, mark) ||
wonBy(0, 2, 1, -1, mark) ||
wonBy(0, 0, 0, 1, mark) ||
wonBy(1, 0, 0, 1, mark) ||
wonBy(2, 0, 0, 1, mark) ||
wonBy(0, 0, 1, 0, mark) ||
wonBy(0, 1, 1, 0, mark) ||
wonBy(0, 2, 1, 0, mark)
private final def wonBy(
row0: Int,
col0: Int,
rowInc: Int,
colInc: Int,
mark: Mark
): Boolean =
extractLine(row0, col0, rowInc, colInc).collect { case Some(v) => v }.toList == List
.fill(3)(mark)
private final def extractLine(
row0: Int,
col0: Int,
rowInc: Int,
colInc: Int
): Iterable[Option[Mark]] =
for {
row <- (row0 to (row0 + rowInc * 2))
col <- (col0 to (col0 + colInc * 2))
} yield value(row)(col)
}
object Board {
final val empty = new Board(Vector.fill(3)(Vector.fill(3)(None)))
def fromChars(
first: Iterable[Char],
second: Iterable[Char],
third: Iterable[Char]
): Option[Board] =
if (first.size != 3 || second.size != 3 || third.size != 3) None
else {
def toMark(char: Char): Option[Mark] =
if (char.toLower == 'x') Some(Mark.X)
else if (char.toLower == 'o') Some(Mark.O)
else None
Some(
new Board(
Vector(
first.map(toMark).toVector,
second.map(toMark).toVector,
third.map(toMark).toVector
)
)
)
}
}
val TestBoard = Board
.fromChars(
List(' ', 'O', 'X'),
List('O', 'X', 'O'),
List('X', ' ', ' ')
)
.get
.render
/**
* The entry point to the game will be here.
*/
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
putStrLn(TestBoard) as 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment