Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created January 1, 2023 17:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/a279eeb098e00413c16a21c18c5f2f1f to your computer and use it in GitHub Desktop.
Save djspiewak/a279eeb098e00413c16a21c18c5f2f1f to your computer and use it in GitHub Desktop.
def raceSuccessAll[F[_], G[_], E, A](
fas: G[F[A]])(
implicit F: GenConcurrent[F, E],
G: Traverse[G])
: F[Either[Chain[E], A]] = {
val permits = fas.size.toInt
F uncancelable { poll =>
for {
complete <- CountDownLatch[F](permits)
success <- F.deferred[A]
errors <- F.ref[Chain[E]](Chain.nil)
fibers <- fas traverse { fa =>
val staged = fa guaranteeCase { oc =>
complete.release *> oc match {
case Outcome.Succeeded(ifa) =>
ifa.flatMap(success.complete(_)).void
case Outcome.Errored(e) =>
errors.update(_ :+ e)
case Outcome.Canceled() =>
F.unit
}
}
staged.start
}
// wait for either the first success, or all fibers to complete
// regardless of which case we end up in, cancel everything when we're done
_ <- poll(F.race(complete.await, success.get).guarantee(fibers.parTraverse_(_.cancel)))
maybeA <- success.tryGet
} yield maybeA match {
case Some(a) => a.asRight[Chain[E]].pure[F]
case None => errors.get.map(_.asLeft[A])
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment