Skip to content

Instantly share code, notes, and snippets.

@sortega
Created November 4, 2016 16:37
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sortega/2fcfbc9e7b365ae40707fd95b71ff079 to your computer and use it in GitHub Desktop.
Save sortega/2fcfbc9e7b365ae40707fd95b71ff079 to your computer and use it in GitHub Desktop.
FS2 part 2
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.Random
import fs2._
package object demo {
implicit val strategy = Strategy.fromExecutionContext(ExecutionContext.global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(2)
def task(name: String, maxDuration: FiniteDuration): Task[Int] =
Task.delay(Random.nextInt(maxDuration.toMillis.toInt).millis).flatMap { duration =>
Task.schedule({
println(s"${System.currentTimeMillis()}> $name")
Random.nextInt()
}, duration)
}
def log(message: Any): Task[Unit] = Task.delay(println(message))
val loggingSink: Sink[Task, Int] = _.evalMap(log)
def computingSink(delay: FiniteDuration): Sink[Task, Int] =
_.evalMap { value =>
Task.schedule(println(s"Computing on $value"), delay)
}
}
import demo._
import scala.concurrent.duration._
import fs2._
import com.jobandtalent.commons.streams.StreamCombinators
/////////////////////////////
// Making new combinators
def takeN[F[_], A](stream: Stream[F, A], n: Int): Stream[F, A] = {
def go(n: Int)(h: Handle[F,A]): Pull[F, A, Unit] =
if (n <= 0) Pull.done
else h.receive1Option {
case None => Pull.done
case Some((elem, h2)) => Pull.output1(elem) >> go(n - 1)(h2)
}
stream.pull(go(n))
}
/////////////////////////////
// Concurrency
// A task: takes time, every time a different result
val a = task("A", 1.second)
// A source of computations
val as = Stream.eval(a).repeat
as.run.unsafeRunFor(3.seconds)
// How to run in parallel?
val bs = Stream.eval(task("B", 2.seconds)).repeat
as.merge(bs).run.unsafeRunFor(5.seconds)
// How to run many, many of them in parallel?
val cs = Stream.eval(task("C", 1.5.seconds)).repeat
concurrent.join(3)(Stream(as, bs, cs)).run.unsafeRunFor(5.seconds)
concurrent.join(2)(Stream(as, bs, cs).map(_.take(10))).run.unsafeRun
// "Tee" for streams
as.take(5).observe(loggingSink).runLog.unsafeRun
as.take(5).observe(computingSink(1.second)).runLog.unsafeRun
as.take(5).observeAsync(computingSink(1.second), 100).runLog.unsafeRun
// Topics
val pubsub = Stream.eval(async.topic[Task, String]("hello"))
.flatMap { topic =>
val producer = Stream("a", "b", "b").to(topic.publish).drain
val consumer1 = topic.subscribe(100).evalMap(log).drain
val consumer2 = topic.subscribe(100).map(_.length)
producer merge consumer1 merge consumer2
}
// Signals
StreamCombinators.runUntilClosed _
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment