Skip to content

Instantly share code, notes, and snippets.

@BalmungSan
Created June 25, 2019 21:15
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save BalmungSan/d4a5d524cab529e18fbf05f100ec3296 to your computer and use it in GitHub Desktop.
Save BalmungSan/d4a5d524cab529e18fbf05f100ec3296 to your computer and use it in GitHub Desktop.
10 code snippets to introducing cats.effect.IO & fs2.Stream
// IO: A Monad for side-effects.
import $ivy.`org.typelevel::cats-effect:1.3.1`
import cats.effect.IO
import scala.concurrent.ExecutionContext
implicit val IOTimer = IO.timer(ExecutionContext.global)
implicit val IOShift = IO.contextShift(ExecutionContext.global)
// ----------------------------------------------
// Program 1: Printing to console.
val ioa = IO { println("Hello, World!") }
val program1: IO[Unit] =
for {
_ <- ioa
_ <- ioa
} yield ()
program1.unsafeRunSync()
// ----------------------------------------------
// Program 2: Reading from console.
def putStrlLn(value: String) = IO { println(value) }
val readLn = IO(scala.io.StdIn.readLine)
val program2 = for {
_ <- putStrlLn("What's your name?")
n <- readLn
_ <- putStrlLn(s"Hello, $n!")
} yield ()
program2.unsafeRunSync()
// ----------------------------------------------
// Program 3: Async & Cancellabele operations.
import cats.effect.{SyncIO, CancelToken}
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.Executors
import scala.concurrent.duration._ // Provides duration units.
def delayedTick(d: FiniteDuration)
(sc: ScheduledExecutorService): SyncIO[CancelToken[IO]] = {
val tick: IO[Unit] = IO.cancelable { cb =>
val r = new Runnable { def run() = cb(Right(())) }
val f = sc.schedule(r, d.length, d.unit)
// Returning the cancellation token needed to
// cancel the scheduling and release resources early
IO {
println("Canceled!")
f.cancel(false)
}
}
tick.runCancelable(_ => IO { println("Tick!") })
}
val sc = Executors.newSingleThreadScheduledExecutor()
val program3 = delayedTick(10 seconds)(sc)
val token = program3.unsafeRunSync()
token.unsafeRunSync()
// ----------------------------------------------
// Program 4: Parallel operations.
import cats.syntax.apply._ // Provides the *> operator.
import cats.syntax.parallel._ // Provided the parMapN & parTraverse extension methods.
val ioA = IO.sleep(1 second) *> IO(println("Running ioA"))
val ioB = IO.sleep(1 second) *> IO(println("Running ioB"))
val ioC = IO.sleep(1 second) *> IO(println("Running ioC"))
val program4_1 = (ioA, ioB, ioC).parMapN { (_, _, _) => () }
program4_1.unsafeRunSync()
import cats.data.NonEmptyList
val program4_2 = NonEmptyList.of(1, 2, 3).parTraverse { i =>
IO.sleep(1 second) *> IO {
println(i)
(i * 5) + 3
}
}
program4_2.unsafeRunSync()
// ----------------------------------------------
// Program 5: Error handling and resource managment.
import cats.effect.Resource
import scala.io.Source
def fahrenheitToCelsius(f: Double): Double =
(f - 32.0) * (5.0 / 9.0)
def parser(data: IO[List[String]]): IO[List[Double]] = data.map { lines =>
for {
line <- lines
if (!line.trim.isEmpty && !line.startsWith("//"))
fahrenheit = line.toDouble
celsius = fahrenheitToCelsius(fahrenheit)
} yield celsius
}
// File at: https://github.com/functional-streams-for-scala/fs2/blob/series/1.1/testdata/fahrenheit.txt
val file = Resource.fromAutoCloseable(IO(Source.fromFile("fahrenheit.txt")))
val parsed = parser(
data = file.use(source => IO(source.getLines.toList))
)
val program5 = parsed.attempt.flatMap {
case Right(data) =>
IO {
println(s"Output: ${data.take(5).mkString("[", ", ", ", ...]")}")
}
case Left(ex) =>
IO {
println(s"Error: ${ex.getMessage}")
}
}
program5.unsafeRunSync()
// ----------------------------------------------
// Stream: A Monad for effectual data streams.
import $ivy.`co.fs2::fs2-core:1.0.5`
import $ivy.`co.fs2::fs2-io:1.0.5`
import fs2.Stream
// ----------------------------------------------
// Program 6: Streaming I/O.
import fs2.{io, text}
import java.nio.file.Paths
val blockingExecutionContext =
Resource.make(
IO(
ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(2)
)
)
) { ec => IO(ec.shutdown()) }
val converter =
Stream.resource(blockingExecutionContext).flatMap { blockingEC =>
io.file.readAll[IO](Paths.get("fahrenheit.txt"), blockingEC, 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(line => !line.trim.isEmpty && !line.startsWith("//"))
.map(line => fahrenheitToCelsius(line.toDouble).toString)
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("celsius.txt"), blockingEC))
}
val program6 = converter.compile.drain
program6.unsafeRunSync()
// ----------------------------------------------
// Program 7: Concurrency.
def greet(from: String): IO[Unit] = IO {
println(s"Hello from: ${from}")
}
def timedGreets(from: String, times: Int): Stream[IO, Unit] = {
val greetStream = Stream.repeatEval(greet(from)).take(times)
val tickStream = Stream.repeatEval(IO.sleep(1 second))
greetStream interleave tickStream
}
val a = timedGreets(from = "A", times = 5)
val b = timedGreets(from = "B", times = 3)
val serial = a ++ b
val program7_1 = serial.compile.drain
program7_1.unsafeRunSync()
val concurrent = a merge b
val program7_2 = concurrent.compile.drain
program7_2.unsafeRunSync()
// ----------------------------------------------
// Program 8: Interruptions.
final case object Err extends Throwable("Err")
val program8_1 = (Stream(1) ++ (throw Err)).take(1).toList
val program8_2 = (Stream(1) ++ Stream.raiseError[IO](Err)).take(2).compile.toList
program8_2.unsafeRunSync()
val program8_3 =
Stream(1, 2, 3)
.covary[IO]
.onFinalize(IO(println("finalized!")))
.take(1)
.compile
.toList
program8_3.unsafeRunSync()
val program8_4 =
Stream(1, 2, 3)
.append(Stream.raiseError[IO](Err))
.append(Stream(4, 5, 6))
.onFinalize(IO(println("finalized!")))
.take(5)
.compile
.toList
program8_4.unsafeRunSync()
val program8_5 =
(Stream(1, 2) ++ Stream.raiseError[IO](Err) ++ Stream(3)).mask.compile.toList
program8_5.unsafeRunSync()
// ----------------------------------------------
// Program 9: Error handling.
final case object Fatal extends Throwable("Fatal")
val program9_1 =
Stream(1, 2, 3)
.append(Stream.raiseError[IO](Err))
.append(Stream(4, 5, 6))
.attempt
.map {
case Right(i) => s"${i}!"
case Left(_) => ""
}.compile
.toList
program9_1.unsafeRunSync()
val program9_2 =
Stream(1, 2, 3)
.append(Stream.raiseError[IO](Fatal))
.attempt
.map {
case Right(i) => Right(s"${i}!")
case Left(Err) => Right("")
case Left(ex) => Left(ex)
}.rethrow
.compile
.toList
program9_2.unsafeRunSync()
val program9_3 =
Stream(1, 2, 3)
.append(Stream.raiseError[IO](Fatal))
.handleErrorWith {
case Err => Stream(0)
case ex => Stream.eval_(IO(println(s"Errro: ${ex.getMessage}")))
}.append(Stream(4, 5, 6))
.compile
.toList
program9_3.unsafeRunSync()
// ----------------------------------------------
// Program 10: Control Flow.
import scala.io.StdIn
import scala.util.Try
val program10_1 =
Stream.unfold(0)(i => if (i <= 5) Some(i -> (i + 1)) else None).toList
val readNumberFromConsole: IO[Option[Double]] = IO {
Option(StdIn.readLine()).flatMap { input =>
Try(input.toDouble).toOption
}
}
val consoleNumbers: Stream[IO, Double] =
Stream.repeatEval(readNumberFromConsole).unNoneTerminate
val program10_2 = consoleNumbers.compile.toList
program10_2.unsafeRunSync()
// ----------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment