Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Last active June 21, 2021 12:09
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save calvinlfer/197d3f4a7aa1365293946da93a24d484 to your computer and use it in GitHub Desktop.
Save calvinlfer/197d3f4a7aa1365293946da93a24d484 to your computer and use it in GitHub Desktop.
Functional Scala: Toronto edition
// Copyright(C) 2018 - John A. De Goes. All rights reserved.
package net.degoes.effects
import scalaz.zio._
import scalaz.zio.console._
import scala.annotation.tailrec
import scala.concurrent.duration._
object notepad {
sealed trait Program[A] { self =>
import Program._
// derived combinator of flatMap
final def map[B](f: A => B): Program[B] =
flatMap(a => point(f(a)))
// derived combinator of flatMap
final def zip[B](that: Program[B]): Program[(A, B)] =
for {
a <- self
b <- that
} yield (a, b)
final def flatMap[B](f: A => Program[B]): Program[B] =
self match {
case Return(thunk) => f(thunk())
case WriteLine(line, next) => WriteLine(line, next.flatMap(f))
case ReadLine(inputToNext) => ReadLine(input => inputToNext(input).flatMap(f))
}
}
object Program {
// returns a value (lazily)
final case class Return[A](value: () => A) extends Program[A]
final case class WriteLine[A](line: String, next: Program[A]) extends Program[A]
// get line of input from Console and feed it to next part of the program
final case class ReadLine[A](inputToNext: String => Program[A]) extends Program[A]
// easy to use language that make use of the above data structures
// Program is an immutable Data structure
// make the user supplied value lazy
def point[A](a: => A): Program[A] = Return(() => a)
def writeLine(line: String): Program[Unit] = WriteLine(line, point(()))
def readLine: Program[String] = ReadLine(input => point(input))
}
import Program._
val helloWorld: Program[Unit] = writeLine("Hello World!")
val getInput: Program[String] = readLine
val sequentialProgram: Program[Unit] =
for {
_ <- helloWorld
i <- getInput
_ <- writeLine(s"Hello $i")
} yield ()
}
object zio_background {
sealed trait Program[A] { self =>
// zip programs together and throws away the left side (<*) but keeps the right side (*>)
// both program effects are evaluated but we only care about the return value of one of them
final def *> [B](that: Program[B]): Program[B] = self.flatMap(_ => that)
final def <* [B](that: Program[B]): Program[A] = self.flatMap(a => that.map(_ => a))
final def map[B](f: A => B): Program[B] =
self match {
case Program.ReadLine(next) => Program.ReadLine(input => next(input).map(f))
case Program.WriteLine(line, next) => Program.WriteLine(line, next.map(f))
case Program.Return(value) => Program.Return(() => f(value()))
}
final def flatMap[B](f: A => Program[B]): Program[B] =
self match {
case Program.ReadLine(next) => Program.ReadLine(input => next(input).flatMap(f))
case Program.WriteLine(line, next) => Program.WriteLine(line, next.flatMap(f))
case Program.Return(value) => f(value())
}
}
object Program {
final case class ReadLine[A](next: String => Program[A]) extends Program[A]
final case class WriteLine[A](line: String, next: Program[A]) extends Program[A]
final case class Return[A](value: () => A) extends Program[A]
val readLine: Program[String] = ReadLine(point[String](_))
def writeLine(line: String): Program[Unit] = WriteLine(line, point(()))
def point[A](a: => A): Program[A] = Return(() => a)
}
import Program.{readLine, writeLine, point}
val yourName1: Program[Unit] =
writeLine("What is your name?").flatMap(_ =>
readLine.flatMap(name =>
writeLine("Hello, " + name + ", good to meet you!").flatMap(_ =>
point(())
)
)
)
//
// EXERCISE 1
//
// Rewrite `program1` to use a for comprehension.
//
val yourName2: Program[Unit] = for {
_ <- writeLine("What is your name?")
name <- readLine
_ <- writeLine(s"Hello $name, good to meet you")
u <- point(())
} yield u
//
// EXERCISE 2
//
// Rewrite `yourName2` using the helper function `getName`, which shows how
// to create larger programs from smaller programs.
//
def yourName3: Program[Unit] = for {
name <- getName
_ <- writeLine(s"Hello $name, good to meet you")
u <- point(())
} yield u
val getName: Program[String] =
writeLine("What is your name?").flatMap(_ => readLine)
//
// EXERCISE 3
//
// Implement the following effectful procedure, which interprets
// `Program[A]` into `A`. You can use this procedure to "run" programs.
//
@tailrec
def interpret[A](program: Program[A]): A = {
import scala.io.StdIn
program match {
case Program.ReadLine(nextInstructionFn: (String => Program[A])) =>
val input = StdIn.readLine() // side-effect
interpret(nextInstructionFn(input))
case Program.WriteLine(line, nextInstruction: Program[A]) =>
println(line) // side-effect
interpret(nextInstruction)
case Program.Return(value: (() => A)) =>
value(): A
}
}
//
// EXERCISE 4
//
// Implement the following function, which shows how to write a combinator
// that operates on programs.
//
def sequence[A](programs: List[Program[A]]): Program[List[A]] = {
// this is essentially traverse
@tailrec
def inner(acc: Program[List[A]], rem: List[Program[A]]): Program[List[A]] = rem match {
case Nil => acc
case headProgram :: rest =>
// execute the headProgram and append it to the existing list of results
// NOTE: the order in which you flatMap matters so pull the existing list then the new program
// when you flatMap, you execute the program
val newAcc: Program[List[A]] = for {
existing <- acc
a <- headProgram
} yield existing :+ a
inner(newAcc, rest)
}
inner(Program.point(List.empty[A]), programs)
}
//
// EXERCISE 5
//
// Translate the following procedural program into a purely functional program
// using `Program` and a for comprehension.
//
def ageExplainer1(): Unit = {
println("What is your age?")
scala.util.Try(scala.io.StdIn.readLine().toInt).toOption match {
case Some(age) =>
if (age < 12) println("You are a kid")
else if (age < 20) println("You are a teenager")
else if (age < 30) println("You are a grownup")
else if (age < 50) println("You are an adult")
else if (age < 80) println("You are a mature adult")
else if (age < 100) println("You are elderly")
else println("You are probably lying.")
case None =>
println("That's not an age, try again")
ageExplainer1()
}
}
def ageExplainer2: Program[Unit] = {
def askForAge: Program[Int] = for {
_ <- writeLine("What is your age?")
potentialAge <- readLine
optAge = scala.util.Try(potentialAge.toInt).toOption
age <- optAge.fold(
ifEmpty = writeLine("That's not an age, try again") *> askForAge)(
f = age => Program.point(age)
)
} yield age
def printAge(age: Int): Program[Unit] =
if (age < 12) writeLine("You are a kid")
else if (age < 20) writeLine("You are a teenager")
else if (age < 30) writeLine("You are a grownup")
else if (age < 50) writeLine("You are an adult")
else if (age < 80) writeLine("You are a mature adult")
else if (age < 100) writeLine("You are elderly")
else writeLine("You are probably lying.")
for {
age <- askForAge
_ <- printAge(age)
} yield ()
}
// Calvin
def main(args: Array[String]): Unit = {
println {
interpret {
sequence {
List(
// we purposely make each program return a value
writeLine("Hello, welcome to Calvin's program!") *> Program.point(1),
ageExplainer2 *> Program.point(2),
writeLine("Once again!") *> Program.point(3),
ageExplainer2 *> Program.point(4)
)
}
}
}
}
}
object zio_type {
type ??? = Nothing
//
// EXERCISE 1
//
// Write the type of `IO` values that can fail with an `Exception`, or
// may produce an `A`.
//
type Exceptional[A] = IO[Exception, A]
//
// EXERCISE 2
//
// Write the type of `IO` values that can fail with a `Throwable`, or
// may produce an `A`.
//
type Task[A] = IO[Throwable, A]
//
// EXERCISE 3
//
// Write the type of `IO` values that cannot fail, but may produce an `A.`
//
type Infallible[A] = IO[Nothing, A]
//
// EXERCISE 4
//
// Write the type of `IO` values that cannot produce a value, but may fail
// with an `E`.
//
type Unproductive[E] = IO[E, Nothing]
//
// EXERCISE 5
//
// Write the type of `IO` values that cannot fail or produce a value.
//
type Unending = IO[Nothing, Nothing]
}
object zio_values {
//
// EXERCISE 1
//
// Using the `IO.now` method, lift the integer `2` into a strictly-evaluated
// `IO`.
//
val ioInteger: IO[Nothing, Int] = IO.now(2)
//
// EXERCISE 2
//
// Using the `IO.point` method, lift the string "Functional Scala" into a
// lazily-evaluated `IO`.
//
val ioString: IO[Nothing, String] = IO.point("Functional Scala")
//
// EXERCISE 3
//
// Using the `IO.fail` method to lift the string "Bad Input" into a failed
// `IO`.
//
val failedInput: IO[String, Nothing] = IO.fail("Bad Input")
}
object zio_composition {
// a sign that something is missing
implicit class FixMe[A](a: A) {
def ?[B] = ???
}
//
// EXERCISE 1
//
// Map the `IO[Nothing, Int]` into an `IO[Nothing, String]` by converting the
// integer into its string rendering using the `map` method of the `IO`
// object.
//
(IO.point(42) ? : IO[Nothing, String])
//
// EXERCISE 2
//
// Map the `IO[Int, Nothing]` into an `IO[String, Nothing]` by converting the
// integer error into its string rendering using the `leftMap` method of the
// `IO` object.
//
(IO.fail(42) ? : IO[String, Nothing])
//
// EXERCISE 3
//
// Using the `flatMap` and `map` methods of `IO`, add `ioX` and `ioY`
// together.
//
val ioX: IO[Nothing, Int] = IO.point(42)
val ioY: IO[Nothing, Int] = IO.point(58)
val ioXPlusY: IO[Nothing, Int] = ioX.flatMap(???)
//
// EXERCISE 4
//
// Using the `flatMap` method of `IO`, implement `ifThenElse`.
//
def ifThenElse[E, A](bool: IO[E, Boolean])(
ifTrue: IO[E, A], ifFalse: IO[E, A]): IO[E, A] =
for {
b <- bool
res <- if (b) ifTrue else ifFalse
} yield res
val exampleIf = ifThenElse(IO.point(true))(IO.point("It's true!"), IO.point("It's false!"))
//
// EXERCISE 5
//
// Translate the following program, which uses for-comprehensions, to its
// equivalent chain of `flatMap`'s, followed by a final `map`.
//
for {
v1 <- IO.point(42)
v2 <- IO.point(58)
} yield "The total is: " + (v1 + v2).toString
IO.point(42).flatMap { v1 =>
IO.point(58).map { v2 =>
"The total is: " + (v1 + v2).toString
}
}
//
// EXERCISE 6
//
// Rewrite the following procedural program, which uses conditionals, into its
// ZIO equivalent.
//
def decode1(read: () => Byte): Either[Byte, Int] = {
val b = read()
if (b < 0) Left(b)
else {
Right(b.toInt +
(read().toInt << 8) +
(read().toInt << 16) +
(read().toInt << 24))
}
}
def decode2[E](read: IO[E, Byte]): IO[E, Either[Byte, Int]] =
for {
b <- read
res <- if (b < 0) IO.point(Left(b))
else for {
shift8 <- read.map(_.toInt << 8)
shift16 <- read.map(_.toInt << 16)
shift24 <- read.map(_.toInt << 24)
} yield Right(b.toInt + shift8 + shift16 + shift24)
} yield res
//
// EXERCISE 7
//
// Rewrite the following procedural program, which uses conditionals, into its
// ZIO equivalent.
//
def getName1(print: String => Unit, read: () => String): Option[String] = {
print("Do you want to enter your name?")
read().toLowerCase.take(1) match {
case "y" => Some(read())
case _ => None
}
}
def getName2[E](print: String => IO[E, String], read: IO[E, String]): IO[E, Option[String]] =
for {
_ <- print("Do you want to enter your name?")
input <- read
res <- input match {
case "y" => read.map(userInput => Some(userInput))
case _ => IO.point(None)
}
} yield res
//
// EXERCISE 8
//
// Translate the following loop into its ZIO equivalent.
//
def forever1(action: () => Unit): Unit = while (true) action()
def forever2[A](action: IO[Nothing, A]): IO[Nothing, Nothing] =
// execute the action, discard the result and repeat
action.flatMap(_ => forever2(action))
//
// EXERCISE 9
//
// Translate the following loop into its ZIO equivalent.
//
def repeatN1(n: Int, action: () => Unit): Unit =
if (n <= 0) ()
else {
action()
repeatN1(n - 1, action)
}
def repeatN2[E](n: Int, action: IO[E, Unit]): IO[E, Unit] =
if (n <= 0) IO.now(())
else action.flatMap(_ => repeatN2(n - 1, action))
//
// EXERCISE 10
//
// Translate the following expression into its `flatMap` equivalent.
//
IO.point(42) *> IO.point(19)
IO.point(42).flatMap(_ => IO.point(19))
//
// EXERCISE 11
//
// Translate the following expression into its `flatMap` equivalent.
//
IO.point(42) <* IO.point(19)
IO.point(42).flatMap { fortyTwo =>
IO.point(19).map { nineteen =>
fortyTwo
}
}
//
// EXERCISE 12
//
// Translate the following expression into an equivalent expression using
// the `map` and `flatMap` methods of the `IO` object.
//
(IO.point(42) <* IO.point(19)) *> IO.point(1)
val p1: IO[Nothing, Int] =
IO.point(42).flatMap { fortyTwo =>
IO.point(19).map { nineteen =>
fortyTwo
}
}
p1.flatMap(_ => IO.point(1))
}
object zio_failure {
// indicates that something is missing
implicit class FixMe[A](a: A) {
def ?[B] = ???
}
//
// EXERCISE 1
//
// Using the `IO.fail` method, create an `IO[String, Int]` value that
// represents a failure with a string error message, containing a user-
// readable description of the failure.
//
val stringFailure1: IO[String, Int] =
IO.fail[String]("oh noes")
//
// EXERCISE 2
//
// Using the `IO.fail` method, create an `IO[Int, String]` value that
// represents a failure with an integer error code.
//
val intFailure: IO[Int, String] =
IO.fail(1)
//
// EXERCISE 3
//
// Transform the error of `intFailure` into its string representation using
// the `leftMap` method of `IO`.
//
val stringFailure2: IO[String, String] = intFailure.leftMap(i => i.toString)
//
// EXERCISE 4
//
// Translate the following exception-throwing program into its ZIO equivalent.
//
def accessArr1[A](i: Int, a: Array[A]): A =
if (i < 0 || i >= a.length) throw new IndexOutOfBoundsException("The index " + i + " is out of bounds [0, " + a.length + ")")
else a(i)
def accessArr2[A](i: Int, a: Array[A]): IO[IndexOutOfBoundsException, A] =
if (i < 0 || i >= a.length) IO.fail(new IndexOutOfBoundsException("The index " + i + " is out of bounds [0, " + a.length + ")"))
else IO.point(a(i))
//
// EXERCISE 5
//
// Translate the following ZIO program into its exception-throwing equivalent.
//
trait DenomIsZero
object DenomIsZero extends DenomIsZero {}
def divide1(n: Int, d: Int): IO[DenomIsZero, Int] =
if (d == 0) IO.fail(DenomIsZero)
else IO.now(n / d)
def divide2(n: Int, d: Int): Int =
if (d == 0) throw new Exception("Denominator is zero")
else n / d
//
// EXERCISE 6
//
// Recover from a division by zero error by returning `-1`.
//
val recovered1: IO[Nothing, Int] =
divide1(100, 0).attempt.map {
case Left(error) => -1
case Right(value) => value
}
//
// EXERCISE 7
//
// Recover from a division by zero error by using `redeem`.
//
val recovered2: IO[Nothing, Int] =
divide1(100, 0).redeem(
err = error => IO.now(-1),
succ = value => IO.now(value)
)
//
// EXERCISE 8
//
// Use the `orElse` method of `IO` to try `firstChoice`, and fallback to
// `secondChoice` only if `firstChoice` fails.
//
val firstChoice: IO[DenomIsZero, Int] = divide1(100, 0)
val secondChoice: IO[Nothing, Int] = IO.now(400)
val combined: IO[Nothing, Int] = firstChoice orElse secondChoice
}
object zio_effects {
import scala.io.StdIn.readLine
import scala.io.Source
import java.io.File
import java.util.concurrent.{Executors, TimeUnit}
type ??? = Nothing
// this is just done to indicate that something is missing
implicit class FixMe[A](a: A) {
def ?[B] = ???
}
//
// EXERCISE 1
//
// Using the `IO.sync` method, wrap Scala's `println` method to import it into
// the world of pure functional programming.
//
def putStrLn(line: String): IO[Nothing, Unit] = IO.sync(println(line))
//
// EXERCISE 2
//
// Using the `IO.sync` method, wrap Scala's `readLine` method to import it
// into the world of pure functional programming.
//
val getStrLn: IO[Nothing, String] = IO.sync(readLine())
//
// EXERCISE 3
//
// Using the `IO.syncException` method, wrap Scala's `getLines` method to
// import it into the world of pure functional programming.
//
def readFile(file: File): IO[Exception, List[String]] =
IO.syncException(Source.fromFile(file).getLines.toList)
// Exercise 3.5
// Use IO.syncThrowable (more for Scala code) to wrap Scala's getLines
def readFileT(file: File): IO[Throwable, List[String]] =
IO.syncThrowable(Source.fromFile(file).getLines.toList)
// Exercise 3.75
def readFileIOE(file: File): IO[java.io.IOException, List[String]] =
// look at a subset of errors, catch those and turn those Throwables into specific E's in IO[E, A]
IO.syncCatch(Source.fromFile(file).getLines.toList) {
case e: java.io.IOException => e
}
//
// EXERCISE 4
//
// Identify the correct method and error type to import `System.nanoTime`
// safely into the world of pure functional programming.
// Calvin: nanoTime cannot fail
def nanoTime: IO[Nothing, Long] = IO.sync(System.nanoTime())
//
// EXERCISE 5
//
// Identify the correct method, error, and value type to import `System.exit`
// safely into the world of pure functional programming.
//
def sysExit(code: Int): IO[SecurityException, Nothing] = IO.syncCatch(System.exit(code)) {
case e : SecurityException => e
}.flatMap(_ => IO.terminate)
//
// EXERCISE 6
//
// Identify the correct method, error, and value type to import
// `Array.update` safely into the world of pure functional programming.
//
def arrayUpdate[A](a: Array[A], i: Int, f: A => A): IO[ArrayIndexOutOfBoundsException, Unit] =
IO.syncCatch(a.update(i, f(a(i)))) {
case e: ArrayIndexOutOfBoundsException => e
}
//
// EXERCISE 7
//
// Use the `IO.async` method to implement the following `sleep` method, and
// choose the correct error type.
//
val scheduledExecutor = Executors.newScheduledThreadPool(1)
def sleep(l: Long, u: TimeUnit): IO[Nothing, Unit] = IO.async { callback: Callback[Nothing, Unit] =>
scheduledExecutor.schedule(
new Runnable {
def run(): Unit = callback(ExitResult.Completed(()))
}, l, u
)
}
//
// EXERCISE 8
//
// Translate the following procedural program into ZIO.
//
def playGame1(): Unit = {
val number = scala.util.Random.nextInt(5)
println("Enter a number between 0 - 5: ")
scala.util.Try(scala.io.StdIn.readLine().toInt).toOption match {
case None =>
println("You didn't enter an integer!")
playGame1
case Some(guess) if (guess == number) =>
println("You guessed right! The number was " + number)
case _ =>
println("You guessed wrong! The number was " + number)
}
}
def playGame2: IO[Exception, Unit] = {
def guess(providedGuess: Int, randomNumber: Int): IO[Nothing, Unit] =
if (providedGuess == randomNumber) IO.sync(s"You guessed right! The number was $randomNumber")
else IO.sync(s"You guessed wrong! The number was $randomNumber")
for {
randomNumber <- IO.sync(scala.util.Random.nextInt(5))
_ <- IO.sync(println("Enter a number between 0 - 5: "))
stringInt <- IO.sync(readLine())
optInt = scala.util.Try(stringInt.toInt).toOption
_ <- optInt.fold(
IO.sync(println("You didn't enter an integer!")) *> playGame2)(
validInt => guess(validInt, randomNumber)
)
} yield ()
}
}
object zio_concurrency {
type ??? = Nothing
// indicates that something is missing
implicit class FixMe[A](a: A) {
def ?[B] = ???
}
//
// EXERCISE 1
//
// Race `leftContestent1` and `rightContestent1` using the `race` method of
// `IO` to see which one finishes first with a successful value.
//
val leftContestent1 = IO.never
val rightContestent1 = putStrLn("Hello World")
val raced1 = leftContestent1 race rightContestent1
// rightContestent1 wins
//
// EXERCISE 2
//
// Race `leftContestent2` and `rightContestent2` using the `race` method of
// `IO` to see which one finishes first with a successful value.
//
val leftContestent2: IO[Exception, Nothing] = IO.fail(new Exception("Uh oh!"))
val rightContestent2: IO[Exception, Unit] = IO.sleep(10.milliseconds) *> putStrLn("Hello World")
val raced2: IO[Exception, Unit] = leftContestent2 race rightContestent2
// right wins because left fails
//
// EXERCISE 3
//
// Compute `leftWork1` and `rightWork1` in parallel using the `par` method of
// `IO`.
//
val leftWork1: IO[Nothing, Int] = fibonacci(10)
val rightWork1: IO[Nothing, Int] = fibonacci(10)
val par1: IO[Nothing, (Int, Int)] = leftWork1.par(rightWork1)
//
// EXERCISE 4
//
// Compute all values `workers` in parallel using `IO.parAll`.
//
val workers: List[IO[Nothing, Int]] =
(1 to 10).toList.map(fibonacci(_))
val workersInParallel: IO[Nothing, List[Int]] = IO.parAll(workers)
//
// EXERCISE 5
//
// Implement `myPar` by forking `left` and `right`, and then joining them
// and yielding a tuple of their results.
//
def myPar[E, A, B](left: IO[E, A], right: IO[E, B]): IO[E, (A, B)] =
for {
lFiber <- left.fork
rFiber <- right.fork
l <- lFiber.join
r <- rFiber.join
} yield (l, r)
//
// EXERCISE 6
//
// Use the `IO.supervise` method to ensure that when the main fiber exits,
// all fibers forked within it will be terminated cleanly.
//
val supervisedExample: IO[Nothing, Unit] =
(for {
fiber <- fibonacci(10000).fork
} yield ())
IO.supervise(supervisedExample)
//
// EXERCISE 7
//
// Use the `interrupt` method of the `Fiber` object to cancel the long-running
// `fiber`.
//
val interrupted1: IO[Nothing, Unit] =
for {
fiber <- fibonacci(10000).fork
_ <- fiber.interrupt
} yield ()
interrupted1
//
// EXERCISE 8
//
// Use the `zipWith` method of the `Fiber` object to combine `fiber1` and
// `fiber2` into a single fiber (by summing the results), so they can be
// interrupted together.
//
val interrupted2: IO[Nothing, Unit] =
for {
fiber1 <- fibonacci(10).fork
fiber2 <- fibonacci(20).fork
both <- (IO.now(fiber1.zipWith(fiber2)((res1, res2) => res1 + res2)) : IO[Nothing, Fiber[Nothing, Int]])
_ <- both.interrupt
} yield ()
//
// EXERCISE 9
//
// io.timeout[Option[A]](None)(Some(_))(60.seconds)
def fibonacci(n: Int): IO[Nothing, Int] =
if (n <= 1) IO.now(n)
else fibonacci(n - 1).seqWith[Nothing, Int, Int](fibonacci(n - 2))(_ + _)
.timeout[Int](z = -1)(f = identity)(duration = 60.seconds)
// Exercise 10, use IO.parTraverse to compute fibonacci numbers of the list of integers in parallel
val fibsToCompute = List(1, 2, 3, 4, 5, 6, 7)
val inParallel: IO[Nothing, List[Int]] = IO.parTraverse(fibsToCompute) { eachElement =>
fibonacci(eachElement)
}
}
object zio_resources {
import java.io.{File, FileInputStream}
class InputStream private (is: FileInputStream) {
def read: IO[Exception, Option[Byte]] =
IO.syncException(is.read).map(i => if (i < 0) None else Some(i.toByte))
def close: IO[Exception, Unit] =
IO.syncException(is.close())
}
object InputStream {
def openFile(file: File): IO[Exception, InputStream] =
IO.syncException(new InputStream(new FileInputStream(file)))
}
//
// EXERCISE 1
//
// Rewrite the following procedural program to ZIO, using `IO.fail` and the
// `bracket` method of the `IO` object.
//
def tryCatch1(): Unit =
try throw new Exception("Uh oh")
finally println("On the way out...")
val tryCatch2: IO[Exception, Unit] =
IO.fail(new Exception("Uh oh"))
.bracket(_ => IO.sync(println("On the way out...")))(use = _ => IO.now(()))
//
// EXERCISE 2
//
// Rewrite the `readFile1` function to use `bracket` so resources can be
// safely cleaned up in the event of errors, defects, or interruption.
//
def readFile1(file: File): IO[Exception, List[Byte]] = {
def readAll(is: InputStream, acc: List[Byte]): IO[Exception, List[Byte]] =
is.read.flatMap {
case None => IO.now(acc.reverse)
case Some(byte) => readAll(is, byte :: acc)
}
for {
stream <- InputStream.openFile(file)
bytes <- readAll(stream, Nil)
} yield bytes
}
def readFile2(file: File): IO[Exception, List[Byte]] = {
def readAll(is: InputStream, acc: List[Byte]): IO[Exception, List[Byte]] =
is.read.flatMap {
case None => IO.now(acc.reverse)
case Some(byte) => readAll(is, byte :: acc)
}
IO.bracket(
acquire = InputStream.openFile(file))(
release = handle => handle.close.attempt.void)(
use = (inputStream: InputStream) => readAll(inputStream, Nil)
)
}
//
// EXERCISE 3
//
// Implement the `tryCatchFinally` method using `bracket`.
//
def tryCatchFinally[E, A]
(try0: IO[E, A])
(catch0: PartialFunction[E, IO[E, A]])
(finally0: IO[Nothing, Unit]): IO[E, A] =
try0
.catchSome(catch0)
.bracket(release = _ => finally0)(use = IO.now)
//
// EXERCISE 4
//
// Use the `bracket` method to rewrite the following snippet to ZIO.
//
def readFileTCF1(file: File): List[Byte] = {
var fis : FileInputStream = null
try {
fis = new FileInputStream(file)
val array: Array[Byte] = Array.ofDim[Byte](file.length.toInt)
fis.read(array)
array.toList
} catch {
case e : java.io.IOException => Nil
} finally if (fis != null) fis.close()
}
def readFileTCF2(file: File): IO[Exception, List[Byte]] = {
def readBytes(is: InputStream, acc: List[Byte]): IO[Exception, List[Byte]] =
for {
optByte <- is.read
res <- optByte.fold(
ifEmpty = IO.syncException(acc))(
byte => readBytes(is, acc :+ byte)
)
} yield res
InputStream.openFile(file).bracket(
release = inputStream => inputStream.close.attempt.void)(
use = is => readBytes(is, Nil)
)
}
}
object zio_schedule {
implicit class FixMe[A](a: A) {
def ? = ???
}
//
// EXERCISE 1
//
// Using `Schedule.recurs`, create a schedule that recurs 5 times.
//
val fiveTimes: Schedule[Any, Int] = Schedule.recurs(5)
//
// EXERCISE 2
//
// Using the `repeat` method of the `IO` object, repeat printing "Hello World"
// five times to the console.
//
val repeated1 = putStrLn("Hello World").repeat(fiveTimes)
//
// EXERCISE 3
//
// Using `Schedule.spaced`, create a schedule that recurs forever every 1
// second.
//
val everySecond: Schedule[Any, Int] = Schedule.spaced(1.second)
//
// EXERCISE 4
//
// Using the `&&` method of the `Schedule` object, the `fiveTimes` schedule,
// and the `everySecond` schedule, create a schedule that repeats fives times,
// every second.
//
val fiveTimesEverySecond = fiveTimes && everySecond
//
// EXERCISE 5
//
// Using the `repeat` method of the `IO` object, repeat the action
// putStrLn("Hi hi") using `fiveTimesEverySecond`.
//
val repeated2 = putStrLn("Hi hi").repeat(fiveTimesEverySecond)
//
// EXERCISE 6
//
// Using the `andThen` method of the `Schedule` object, the `fiveTimes`
// schedule, and the `everySecond` schedule, create a schedule that repeats
// fives times rapidly, and then repeats every second forever.
//
val fiveTimesThenEverySecond = fiveTimes andThen everySecond
//
// EXERCISE 7
//
// Using the `retry` method of the `IO` object, retry the following error
// a total of five times.
//
val error1 = IO.fail("Uh oh!")
val retried5 = error1.retry(Schedule.recurs(5))
//
// EXERCISE 8
//
// Using the `&&` method of the `Schedule` object, the `fiveTimes` schedule,
// and the `everySecond` schedule, create a schedule that repeats the minimum
// of five times and every second.
//
val fiveTimesOrEverySecond = fiveTimes || everySecond
//
// EXERCISE 9
//
// Produce a jittered schedule that first does exponential spacing (starting
// from 10 milliseconds), but then after the spacing reaches 60 seconds,
// switches over to fixed spacing of 60 seconds between recurrences, but will
// only do that for up to 100 times, and produce a list of the results.
//
def mySchedule[A]: Schedule[A, List[A]] = {
// FYI: Schedule has an Applicative typeclass
// Produce a jittered schedule that first does exponential spacing (starting
// from 10 milliseconds), but then after the spacing reaches 60 seconds,
val part1 = Schedule.exponential(10.milliseconds).whileValue(currentDuration => currentDuration < 60.seconds)
// fixed spacing of 60 seconds between recurrences,
// but will only do that for up to 100 times,
val part2 = Schedule.fixed(60.seconds) && Schedule.recurs(100)
// Produce a jittered schedule that first does exponential spacing (starting
// from 10 milliseconds), but then after the spacing reaches 60 seconds,
// switches over to fixed spacing of 60 seconds between recurrences, but will
// only do that for up to 100 times
val combined = (part1 andThen part2).jittered
// collect values from the input
val identitySchedule: Schedule[A, List[A]] = Schedule.identity[A].collect
combined *> identitySchedule
}
}
object zio_interop {
implicit class FixMe[A](a: A) {
def ? = ???
}
import scala.concurrent.Future
import scalaz.zio.interop.future._
import scala.concurrent.ExecutionContext.Implicits.global
//
// EXERCISE 1
//
// Use `IO.fromFuture` method to convert the following `Future` into an `IO`.
//
val future1 = () => Future.successful("Hello World")
val io1 = IO.fromFuture(future1)(global)
//
// EXERCISE 2
//
// Use the `toFuture` method on `IO` to convert the following `io` to `Future`.
//
val io2: IO[Throwable, Int] = IO.point(42)
val future2: IO[Nothing, Future[Int]] = io2.toFuture
//
// EXERCISE 3
//
// Use the Fiber.fromFuture` method to convert the following `Future` into
// an `IO`.
//
val future3 = () => Future.failed[Int](new Error("Uh ohs!"))
val fiber1: Fiber[Throwable, Int] = Fiber.fromFuture(future3())(global)
// Use the following to make ScalaZ ZIO work with Cats Effect typeclases
import scalaz.zio.interop.Task
import scalaz.zio.interop.catz._
import cats.effect.concurrent.Ref
//
// EXERCISE 4
//
// The following example uses the `Ref` from `cats-effect`, demonstrating how
// `cats-effect` structures work with ZIO.
//
class Worker(number: Int, ref: Ref[Task, Int]) {
def work: Task[Unit] =
for {
c1 <- ref.get
_ <- putStrLn(s"#$number >> $c1")
c2 <- ref.modify(x => (x + 1, x))
_ <- putStrLn(s"#$number >> $c2")
} yield ()
}
// Note the Ref is from Cats Effect not from ZIO here
val program: Task[Unit] =
for {
ref <- Ref.of[Task, Int](0)
w1 = new Worker(1, ref)
w2 = new Worker(2, ref)
w3 = new Worker(3, ref)
f <- IO.forkAll(List(w1.work, w2.work, w3.work))
_ <- f.join
} yield ()
}
object zio_ref {
// EXERCISE 1
// Use Ref.apply to create a Ref that is initially 0
// Ref can be thought of as a var but is nicer
val makeZero: IO[Nothing, Ref[Int]] = Ref(0) // haven't done anything yet, description only
// EXERCISE 2
// use get and set to change the value to be 10 greater than the initial value
val incrementedBy10: IO[Nothing, Int] = for {
ref <- makeZero
value <- ref.get: IO[Nothing, Int]
newValue = value + 10
_ <- ref.set(newValue): IO[Nothing, Unit]
} yield newValue
// EXERCISE 3
// Use update instead of get and set
val atomicallyIncrementedBy10: IO[Nothing, Int] = for {
ref <- makeZero
newValue <- ref.update(existing => existing + 10): IO[Nothing, Int]
} yield newValue
// EXERCISE 4
// Use modify to atomically increment the value but return the old value
val modifyIncrementedBy10: IO[Nothing, Int] = for {
ref <- makeZero
newValue <- ref.modify(existing => (existing /*old*/, existing + 10 /*new*/)): IO[Nothing, Int]
} yield newValue
}
object zio_promise {
// EXERCISE 1
// Using `make` of `Promise`, construct a promise that cannot fail but can be completed with an integer
val makeIntPromise: IO[Nothing, Promise[Nothing, Int]] = Promise.make[Nothing, Int] // description only
// EXERCISE 2 use the `complete` method of `Promise` to complete a Promise constructed with `makeIntPromise`
val completed1: IO[Nothing, Boolean] = for {
promise <- makeIntPromise
completedSuccessfully <- promise.complete(42): IO[Nothing, Boolean]
} yield completedSuccessfully
// EXERCISE 3
// use the error of Promise, try to complete a Promise with makeIntPromise. Explain your findings.
// val errored1: IO[Nothing, Boolean] =
// for {
// promise <- makeIntPromise
// completed <- promise.error(/* you don't have values of type Nothing so you cannot construct this program */)
// } yield completed
// EXERCISE 4
// Use the error of Promise to complete a new Promise that can fail for any Error
val errored2: IO[Nothing, Boolean] =
for {
promise <- Promise.make[Error, String]
completed <- promise.error(new Error("boom!")): IO[Nothing, Boolean] // note we are explicitly typing this to be
// IO[Nothing, Boolean], it's actually
// IO[Error, Boolean]
} yield completed
// EXERCISE 5
// Using interrupt of Promise, complete a new promise you construct with Promise.make which can fail for any Error or produce a String
val interrupted: IO[Nothing, Boolean] = for {
promise <- Promise.make[Error, String]
completed <- promise.interrupt: IO[Nothing, Boolean] // doesn't actually complete the promise so completed = false
} yield completed
// EXERCISE 6
// Using get of a Promise, retrieve a cvaclue computed from inside another Fiber
val handoff1: IO[Nothing, Int] =
for {
promise <- Promise.make[Nothing, Int]
_ <- promise.complete(42).delay(10.milliseconds).fork // complete the promise in 10 seconds with value 42 on another fiber
value <- promise.get
} yield value
// EXERCISE 7
// similar to 6
val handoff2: IO[Error, Int] =
for {
promise <- Promise.make[Error, Int]
_ <- promise.error(new Error("Uh oh!")).delay(10.milliseconds).fork
value <- promise.get // this will fail using the E in IO[E=Error, A=Int], get suspends until a value/error is present
} yield value
// EXERCISE 8
// retrieve a value from a promise that was interrupted in another fiber
val handoff3: IO[Error, Int] =
for {
promise <- Promise.make[Error, Int]
_ <- promise.interrupt.delay(10.milliseconds).fork
value <- promise.get: IO[Error, Int]
} yield value
}
object zio_queue {
// EXERCISE 1
// Use Queue.bounded to create a queue with capacity 10
val makeQueue: IO[Nothing, Queue[Int]] = Queue.bounded(10)
// EXERCISE 2
// Use offer to place a value 42 into the queue
val offered1: IO[Nothing, Unit] =
for {
queue <- makeQueue
_ <- queue.offer(42)
} yield ()
// EXERCISE 3
// Use take to take a value from the queue
val taken1: IO[Nothing, Int] =
for {
queue <- makeQueue
_ <- queue.offer(42)
value <- queue.take
} yield value
// EXERCISE 4
// In one fiber, place 2 values into a queue and in the main fiber, read 2 values from the queue
val offeredTaken1: IO[Nothing, (Int, Int)] =
for {
queue <- makeQueue
_ <- (queue.offer(42) *> queue.offer(42)).fork
v1 <- queue.take: IO[Nothing, Int]
v2 <- queue.take: IO[Nothing, Int]
} yield (v1, v2)
// EXERCISE 5
// In one fiber, read infinitely from the queue in a child fiber and in the main fiber, write 100 values into the queue
val infiniteReader1: IO[Nothing, List[Int]] = {
def repeatedlyOffer[A](element: A, times: Int, queue: Queue[A]): IO[Nothing, List[A]] =
if (times == 0) IO.point(Nil)
else for {
_ <- queue.offer(element)
listInt <- repeatedlyOffer(element, times - 1, queue)
} yield listInt :+ element
for {
queue <- makeQueue
_ <- (queue.take.flatMap(element => putStrLn(s"Taking $element").attempt.void).forever: IO[Nothing, Nothing]).fork
vs <- repeatedlyOffer(element = 42, times = 100, queue = queue): IO[Nothing, List[Int]]
} yield vs
}
// EXERCISE 6
// Using Queue, Ref and Promise, implement an Actor like construct that can atomically update values of a counter
val makeCounter: IO[Nothing, Int => IO[Nothing, Int]] =
for {
counter <- Ref(0)
queue <- Queue.bounded[(Int, Promise[Nothing, Int])](100)
// consume from the queue and update the counter and then complete the promise to indicate
// we have finished with the element we have pulled from the queue
_ <- queue.take
.flatMap{
case (completeWith: Int, promise: Promise[Nothing, Int]) =>
counter.update(existing => existing + completeWith) *> promise.complete(completeWith)
}
.forever
.fork: IO[Nothing, Fiber[Nothing, Nothing]]
} yield {
// we yield a function for a way to send messages to the actor, let it do the work and get the updated value
// only the actor deals with the Ref and communicates back with us (this piece of code below) the updated Ref value
// using the Promise
amount: Int =>
for {
promise <- Promise.make[Nothing, Int]
_ <- queue.offer((amount, promise))
newValue <- promise.get // get the new value of the counter (this work is done by the "actor" fiber)
} yield newValue
}
// create a producer
val counterExample: IO[Nothing, Int] =
for {
counterFn <- makeCounter
_ <- IO.parAll(List.fill(100)(IO.traverse(0 to 100)(counterFn)))
value <- counterFn(0) // call the counterFn with 0, this will happen 100 times in parallel thanks to parAll
} yield value
}
object zio_rts {
// Exercise 1
// Create a new runtime system that extends scalaz.zio.RTS
val MyRTS: RTS = new RTS {}
// Exercise 2
// Run the following IO using MyRTS
MyRTS.unsafeRun(putStrLn("Hello world")): Unit
// Exercise 3
// Use unsafeRunSync so it doesn't throw exceptions
MyRTS.unsafeRunSync(putStrLn("Hello world")): ExitResult[java.io.IOException, Unit]
// Exercise 4
// Implement a purely functional application using ZIO's App
object MyApp extends scalaz.zio.App {
override def run(args: List[String]): IO[Nothing, MyApp.ExitStatus] = {
val core = for {
_ <- putStrLn("Hello there!")
name <- getStrLn
_ <- putStrLn(s"Hello $name")
} yield ()
core.redeemPure(
err = _ => ExitStatus.ExitNow(1),
succ = _ => ExitStatus.ExitNow(0)
)
}
}
}
object zio_advanced {
// change thread pool of RTS
// make it so that Fibers will yield after every operation
}
@calvinlfer
Copy link
Author

Please don't hesitate to point out mistakes in the implementation and post your solution here 😄

@calvinlfer
Copy link
Author

calvinlfer commented Oct 4, 2018

Revisiting Schedule

import scalaz._
import Scalaz._
import scalaz.zio._
import scalaz.zio.console._
import scala.concurrent.duration._

// https://static.javadoc.io/org.scalaz/scalaz-zio_2.12/0.2.9/scalaz/zio/Schedule$.html
object ScheduleMePlease extends App {
  override def run(args: List[String]): IO[Nothing, ScheduleMePlease.ExitStatus] = {
    def `recur 10 times with a 500ms delay between recurrences`[A]: Schedule[A, Int] =
      Schedule.recurs(10).delayed(inputDuration => 500.milliseconds)

    // && returns a new schedule that continues only as long as both schedules continue,
    // using the maximum of the delays of the two schedules.
    def `recur 10 times with a 1s delay between recurrences`[A]: Schedule[A, (Int, Int)] =
      (`recur 10 times with a 500ms delay between recurrences`: Schedule[A, Int]) && (Schedule.spaced(1.second): Schedule[A, Int])

    def customSchedule[A](start: Duration = 10.milliseconds, max: Duration = 60.seconds, nrOfTriesAfterMaxHit: Int = 100): Schedule[A, List[A]] =
      (Schedule.exponential(start).whileValue(duration => duration < max) /* apply the exponential schedule as long as the duration is less than max, if it's more then this schedule will stop */
        andThen (Schedule.spaced(max) && Schedule.recurs(nrOfTriesAfterMaxHit)) /* then apply a fixed spacing schedule of max duration for 100 times */).jittered /* jitter all */ *>
        Schedule.identity[A].collect /* collect the input from the preceding IO as a list for every invocation of this schedule */

    putStrLn("Hello world!")
      .map(_ => 1) // this is the preceding input IO that our custom schedule will collect the data, the IO outputs Int so that is the input to our Schedule
      .repeat(customSchedule(start = 1.milli, max = 2.seconds, nrOfTriesAfterMaxHit = 10): Schedule[Int, List[Int]])
      .flatMap((i: List[Int]) => putStrLn(s"data after schedule: $i"))
      .attempt
      .map(_ => ExitStatus.ExitNow(1))
  }
}

@calvinlfer
Copy link
Author

calvinlfer commented Oct 4, 2018

Creating our own Schedule from scratch

    // this custom schedule accepts preceding IO[E, A] where A=Int (input), it will repeat the computation if the input is < 65 and will record the input that came through in the String
    // if this Schedule does not get run to completion (i.e. we don't hit Decision.done) (because you combined it with a shorter schedule then it will emit "premature finish")
    def superCustomSchedule: Schedule[Int /* Input */, String /* Emit */] = Schedule.apply[String /* Internal State */, Int /* Input */, String /* Emit */](
      IO.point("Initial State: "),
      (incoming: Int, acc: String) =>
        if (incoming > 65) IO.point(
          // do not want another execution to happen
          Decision.done(
            d = 0.milliseconds /* delay */,
            a = "" /* internal state */,
            b = acc /* value to emit */)
        )
        else IO.point(
          // want another execution to happen
          Decision.cont(
            d = 500.milliseconds /* delay 500ms before the next schedule */,
            a = acc ++ "," ++ incoming.toString /* internal state */,
            b = "premature-finish" /* value to emit in case this schedule is not run to completion */
          )
        )
    )

    putStrLn("Hello world!")
      .flatMap(_ => IO.sync(Random.nextInt(70)))
      .repeat(superCustomSchedule <* Schedule.recurs(3)) // <* is like && but takes the output of the left scheduler (superCustomSchedule)
      .flatMap((i: String) => putStrLn(s"data after schedule: $i"))
      .attempt
      .map(_ => ExitStatus.ExitNow(1))

@calvinlfer
Copy link
Author

Regarding https://gist.github.com/calvinlfer/197d3f4a7aa1365293946da93a24d484#file-day-3-scala-L1283, the work i'm referencing is the completion of the Promise with the new counter value

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment