Created
October 29, 2016 13:10
-
-
Save sortega/739aaa2ad8ea1f2b5c3b58b4873904d2 to your computer and use it in GitHub Desktop.
FS2 intro worksheet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.Paths | |
import scala.concurrent.Future | |
import scala.util.control.NoStackTrace | |
import fs2._ | |
import scalaz._ | |
import Scalaz._ | |
import com.jobandtalent.commons.streams.{FileStreams, StreamCombinators} | |
//// Why? | |
// | |
//1: Int | |
//Future.successful(1): Future[Int] | |
//Stream.emits(Seq(1, 2, 3)): Stream[Task, Int] | |
// | |
// | |
//// Basics | |
// | |
//val s0 = Stream.empty | |
//val s1 = Stream(1, 2, 3) | |
//val s2 = Stream.emit(1) | |
// | |
//s1.map(_ * 2).toList | |
//s2.filter(_ % 2 == 1).toList | |
// | |
//Stream.range(0, 5).intersperse(42).toList | |
// | |
//s1.repeat.take(20).toList | |
// Effects: Task[A] = () => Future[A] | |
def effect(name: String): Task[String] = Task.delay { | |
println(name) | |
name | |
} | |
val effectful: Stream[Task, String] = | |
Stream("bambi") ++ Stream.eval(effect("drop database")) | |
//effectful.toList | |
effectful.runLog.unsafeRun() | |
effectful.run.unsafeRun() | |
effectful.runFold(0)((accum, _) => accum + 1).unsafeRun() | |
// Error handling | |
val fail: Stream[Task, Int] = Stream.fail[Task]( | |
new Exception("on purpose") with NoStackTrace) | |
fail.run.unsafeAttemptValue() | |
val handled = fail.onError { | |
case _: NumberFormatException => Stream(3) | |
case _: Exception => Stream(42, 13, 15) | |
} | |
handled.runLog.unsafeRun() | |
val foo = (Stream(0) ++ fail ++ Stream(1,2,3)).attempt.collect { | |
case Right(event) => event | |
// case Left(_) => Stream.empty | |
}.runLog.unsafeRun() | |
// RESOURCE SAFETY | |
def withContainer[A](block: String => Stream[Task, A]) = | |
Stream.bracket(effect("create container"))(block, _ => effect("stop container").map(_ => ())) | |
withContainer { container => | |
Stream(1, 2, 3) | |
}.runLog.unsafeRun() | |
withContainer { container => | |
Stream(1) ++ fail ++ Stream(3) | |
}.runLog.unsafeAttemptRun() | |
// COMPOSITION! | |
// Source === Stream | |
// Pipe[F, I, O] = Stream[F, I] => Stream[F, O] | |
type Enricher[P, R] = Pipe[Task, P, R] | |
// Sink[F, I] = Pipe[F, I, Unit] | |
//// IO package | |
io.file.readAll[Task](Paths.get("/Users/sortega/.zshrc"), chunkSize = 1024) | |
.throughPure(FileStreams.nonEmptyLines) | |
.take(2) | |
.runLog | |
.unsafeRun | |
//// Concurrent primitives: see runUntilClosed for an example | |
// | |
//// READ MORE AT https://github.com/functional-streams-for-scala/fs2/blob/series/0.9/docs/guide.md | |
val naturals = scala.Stream.iterate(0)(_ + 1) | |
naturals.map { | |
case 99 => throw new Exception("jaja") | |
case otherwise => otherwise + 1 | |
}.take(200).toVector |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment