Skip to content

Instantly share code, notes, and snippets.

@kiambogo
Created November 26, 2018 03:11
Show Gist options
  • Save kiambogo/e80bf8e5ceed63b637a428557f1784bb to your computer and use it in GitHub Desktop.
Save kiambogo/e80bf8e5ceed63b637a428557f1784bb to your computer and use it in GitHub Desktop.
fs2 chunkN vs groupWithin
import $ivy.`co.fs2::fs2-core:1.0.0`
import fs2.{Stream, Pipe}
import fs2.concurrent.Queue
import cats.effect.{IO, ContextShift, Timer, Concurrent}
import scala.util.Random
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val timer: Timer[IO] = IO.timer(ec)
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
def build(implicit F: Concurrent[IO]) = {
for {
queue <- Stream.eval(Queue.unbounded[IO, Int](F))
s <- Stream(
Stream.eval(IO(Random.nextInt)).repeat.take(18).evalMap(i => queue.enqueue1(i)).drain,
queue.dequeue.chunkN(10).evalMap(i => IO(println(i))).drain).parJoinUnbounded
} yield s
}
// Blocks current thread
build.compile.toList.unsafeRunSync
/* Output
* @ build.compile.toList.unsafeRunSync
* Chunk(-391833074, 614565068, 1936167563, -2119471686, -1857543124, -1900916042, 1353128895, 1889653573, 500392992, 2118924531)
* ...
*/
import $ivy.`co.fs2::fs2-core:1.0.0`
import fs2.{Stream, Pipe}
import fs2.concurrent.Queue
import cats.effect.{IO, ContextShift, Timer, Concurrent}
import scala.util.Random
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val timer: Timer[IO] = IO.timer(ec)
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
def build(implicit F: Concurrent[IO]) = {
for {
queue <- Stream.eval(Queue.unbounded[IO, Int](F))
s <- Stream(
Stream.eval(IO(Random.nextInt)).repeat.take(18).evalMap(i => queue.enqueue1(i)).drain,
queue.dequeue.groupWithin(10, 10.seconds).evalMap(i => IO(println(i))).drain).parJoinUnbounded
} yield s
}
// Blocks current thread
build.compile.toList.unsafeRunSync
/* Output
* @ build.compile.toList.unsafeRunSync
* Chunk(-1133522839, 1008897302, 940406286, -2062433713, -1028396178, -1171614910, 1890998151, 274740986, -1472404521, 1045040472)
* ...after 10 seconds...
* Chunk(-1330500851, -502124516, 1118085056, 1144331273, 1422450486, -1756001900, -308979842, 1989353154)
* ...end of output
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment