Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Last active September 7, 2023 07:09
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gvolpe/0f6868f846aedefaf3c6540d9e8d3559 to your computer and use it in GitHub Desktop.
Save gvolpe/0f6868f846aedefaf3c6540d9e8d3559 to your computer and use it in GitHub Desktop.
Java Future to Cats Effect F[_]
import java.util.concurrent.Future
import cats.effect.{Effect, Sync, Timer}
import cats.syntax.all._
import fs2._
import fs2.async.mutable.Signal
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
class JFuture[F[_]](implicit F: Effect[F], S: Scheduler, ec: ExecutionContext) {
private val PollInterval = 40.millis
private def jFutureToF[A](fa: F[Future[A]])(signal: Signal[F, Boolean]): F[A] = {
def check(future: Future[A]): F[Unit] =
S.awakeEvery[F](PollInterval)
.evalMap(_ => signal.modify(_ => future.isDone))
.repeat
.interruptWhen(signal)
.compile.drain
for {
future <- fa
_ <- check(future)
effect <- F.delay(future.get())
} yield effect
}
def from[A](fa: F[Future[A]]): F[A] =
for {
signal <- async.signalOf[F, Boolean](false)
ioa <- jFutureToF[A](fa)(signal)
} yield ioa
}
object JFuture {
def to[F[_], A](fa: F[Future[A]], pollInterval: FiniteDuration = 50.millis)
(implicit F: Sync[F], T: Timer[F]): F[A] = {
def loop(jf: Future[A]): F[A] =
F.delay(jf.isDone).flatMap { isDone =>
if (isDone) F.delay(jf.get)
else T.sleep(pollInterval) *> loop(jf)
}
fa.flatMap(loop)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment