Created
November 26, 2018 03:11
-
-
Save kiambogo/e80bf8e5ceed63b637a428557f1784bb to your computer and use it in GitHub Desktop.
fs2 chunkN vs groupWithin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import $ivy.`co.fs2::fs2-core:1.0.0` | |
import fs2.{Stream, Pipe} | |
import fs2.concurrent.Queue | |
import cats.effect.{IO, ContextShift, Timer, Concurrent} | |
import scala.util.Random | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext | |
implicit val ec: ExecutionContext = ExecutionContext.global | |
implicit val timer: Timer[IO] = IO.timer(ec) | |
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec) | |
def build(implicit F: Concurrent[IO]) = { | |
for { | |
queue <- Stream.eval(Queue.unbounded[IO, Int](F)) | |
s <- Stream( | |
Stream.eval(IO(Random.nextInt)).repeat.take(18).evalMap(i => queue.enqueue1(i)).drain, | |
queue.dequeue.chunkN(10).evalMap(i => IO(println(i))).drain).parJoinUnbounded | |
} yield s | |
} | |
// Blocks current thread | |
build.compile.toList.unsafeRunSync | |
/* Output | |
* @ build.compile.toList.unsafeRunSync | |
* Chunk(-391833074, 614565068, 1936167563, -2119471686, -1857543124, -1900916042, 1353128895, 1889653573, 500392992, 2118924531) | |
* ... | |
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import $ivy.`co.fs2::fs2-core:1.0.0` | |
import fs2.{Stream, Pipe} | |
import fs2.concurrent.Queue | |
import cats.effect.{IO, ContextShift, Timer, Concurrent} | |
import scala.util.Random | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext | |
implicit val ec: ExecutionContext = ExecutionContext.global | |
implicit val timer: Timer[IO] = IO.timer(ec) | |
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec) | |
def build(implicit F: Concurrent[IO]) = { | |
for { | |
queue <- Stream.eval(Queue.unbounded[IO, Int](F)) | |
s <- Stream( | |
Stream.eval(IO(Random.nextInt)).repeat.take(18).evalMap(i => queue.enqueue1(i)).drain, | |
queue.dequeue.groupWithin(10, 10.seconds).evalMap(i => IO(println(i))).drain).parJoinUnbounded | |
} yield s | |
} | |
// Blocks current thread | |
build.compile.toList.unsafeRunSync | |
/* Output | |
* @ build.compile.toList.unsafeRunSync | |
* Chunk(-1133522839, 1008897302, 940406286, -2062433713, -1028396178, -1171614910, 1890998151, 274740986, -1472404521, 1045040472) | |
* ...after 10 seconds... | |
* Chunk(-1330500851, -502124516, 1118085056, 1144331273, 1422450486, -1756001900, -308979842, 1989353154) | |
* ...end of output | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment