Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Last active May 24, 2019 01:34
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save gvolpe/aeae4921689cbf3ad72b44f6159ab3d0 to your computer and use it in GitHub Desktop.
Sharding values using Fs2
import cats.effect._
import cats.effect.concurrent.Ref
import cats.implicits._
import fs2._
import fs2.concurrent.Queue
import scala.util.control.NoStackTrace
object ShardingDemo extends IOApp {
def putStrLn[A](a: A): IO[Unit] = IO(println(a))
case object Fail extends NoStackTrace {
override def toString(): String = s"Should not occur"
}
def sharded(shards: Int)(source: Stream[IO, Int]): Stream[IO, Unit] =
for {
queues <- Stream.eval(Queue.bounded[IO, Int](500).replicateA(shards))
st <- Stream.eval(Ref.of[IO, Map[Int, Queue[IO, Int]]](List.range(0, shards).zip(queues).toMap))
n <- source
kv <- Stream.eval(st.get)
_ <- kv.get(n % shards).fold(Stream.raiseError[IO](Fail).void) { q =>
Stream
.eval(q.enqueue1(n))
.concurrently(q.dequeue.evalMap(x => putStrLn(s"Shard: ${n % shards}, Value: $x")))
}
} yield ()
def run(args: List[String]): IO[ExitCode] =
sharded(5)(Stream.range[IO](1, 11)).compile.drain.as(ExitCode.Success)
}
@gvolpe
Copy link
Author

gvolpe commented May 23, 2019

Note: Instead of List.range(0, shards).zip(queues) one could use queues.zipWithIndex.map(_.swap) but I find the former more readable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment