Skip to content

Instantly share code, notes, and snippets.

@sortega
Created October 29, 2016 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sortega/739aaa2ad8ea1f2b5c3b58b4873904d2 to your computer and use it in GitHub Desktop.
Save sortega/739aaa2ad8ea1f2b5c3b58b4873904d2 to your computer and use it in GitHub Desktop.
FS2 intro worksheet
import java.nio.file.Paths
import scala.concurrent.Future
import scala.util.control.NoStackTrace
import fs2._
import scalaz._
import Scalaz._
import com.jobandtalent.commons.streams.{FileStreams, StreamCombinators}
//// Why?
//
//1: Int
//Future.successful(1): Future[Int]
//Stream.emits(Seq(1, 2, 3)): Stream[Task, Int]
//
//
//// Basics
//
//val s0 = Stream.empty
//val s1 = Stream(1, 2, 3)
//val s2 = Stream.emit(1)
//
//s1.map(_ * 2).toList
//s2.filter(_ % 2 == 1).toList
//
//Stream.range(0, 5).intersperse(42).toList
//
//s1.repeat.take(20).toList
// Effects: Task[A] = () => Future[A]
def effect(name: String): Task[String] = Task.delay {
println(name)
name
}
val effectful: Stream[Task, String] =
Stream("bambi") ++ Stream.eval(effect("drop database"))
//effectful.toList
effectful.runLog.unsafeRun()
effectful.run.unsafeRun()
effectful.runFold(0)((accum, _) => accum + 1).unsafeRun()
// Error handling
val fail: Stream[Task, Int] = Stream.fail[Task](
new Exception("on purpose") with NoStackTrace)
fail.run.unsafeAttemptValue()
val handled = fail.onError {
case _: NumberFormatException => Stream(3)
case _: Exception => Stream(42, 13, 15)
}
handled.runLog.unsafeRun()
val foo = (Stream(0) ++ fail ++ Stream(1,2,3)).attempt.collect {
case Right(event) => event
// case Left(_) => Stream.empty
}.runLog.unsafeRun()
// RESOURCE SAFETY
def withContainer[A](block: String => Stream[Task, A]) =
Stream.bracket(effect("create container"))(block, _ => effect("stop container").map(_ => ()))
withContainer { container =>
Stream(1, 2, 3)
}.runLog.unsafeRun()
withContainer { container =>
Stream(1) ++ fail ++ Stream(3)
}.runLog.unsafeAttemptRun()
// COMPOSITION!
// Source === Stream
// Pipe[F, I, O] = Stream[F, I] => Stream[F, O]
type Enricher[P, R] = Pipe[Task, P, R]
// Sink[F, I] = Pipe[F, I, Unit]
//// IO package
io.file.readAll[Task](Paths.get("/Users/sortega/.zshrc"), chunkSize = 1024)
.throughPure(FileStreams.nonEmptyLines)
.take(2)
.runLog
.unsafeRun
//// Concurrent primitives: see runUntilClosed for an example
//
//// READ MORE AT https://github.com/functional-streams-for-scala/fs2/blob/series/0.9/docs/guide.md
val naturals = scala.Stream.iterate(0)(_ + 1)
naturals.map {
case 99 => throw new Exception("jaja")
case otherwise => otherwise + 1
}.take(200).toVector
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment