Skip to content

Instantly share code, notes, and snippets.

@derekjw
Created April 23, 2018 10:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save derekjw/081af43b31b2dfe7e2934afe590b6185 to your computer and use it in GitHub Desktop.
Save derekjw/081af43b31b2dfe7e2934afe590b6185 to your computer and use it in GitHub Desktop.
ordered mapAsync for fs2
import cats.implicits._
import cats.effect.Effect
import fs2.{Pipe, Stream}
import fs2.async.Promise
import fs2.async.mutable.Queue
import scala.concurrent.ExecutionContext
def mapAsync[F[_]: Effect, A, B](parallelism: Int)(f: A => F[B])(implicit executionContext: ExecutionContext): Pipe[F, A, B] = { stream =>
Stream.eval(Queue.bounded[F, Option[F[Either[Throwable, B]]]](parallelism)).flatMap { q =>
q.dequeueAvailable
.unNoneTerminate
.evalMap(identity)
.rethrow
.concurrently {
stream.evalMap { a =>
Promise.empty[F, Either[Throwable, B]].flatMap { promise =>
val sb = Stream.eval(f(a).attempt).evalMap(promise.complete)
q.enqueue1(Some(promise.get)).as(sb)
}
}.join(parallelism).drain ++ Stream.eval(q.enqueue1(None))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment