Skip to content

Instantly share code, notes, and snippets.

@Baccata
Forked from Daenyth/JavaFuture.scala
Created July 5, 2018 15:24
Show Gist options
  • Save Baccata/b63f6176ca4f90670d69115d703f67a5 to your computer and use it in GitHub Desktop.
Save Baccata/b63f6176ca4f90670d69115d703f67a5 to your computer and use it in GitHub Desktop.
Convert Java futures to cats-effect IO
package teikametrics
import cats.effect.{Sync, Timer}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.blocking
import cats.syntax.all._
object JavaFuture {
// Alias for more friendly imports
type JFuture[A] = java.util.concurrent.Future[A]
/** Convert a java future to an effectful value, using async sleep polling to get the result.
*
* Because this uses async sleep, it won't block a thread.
*
* Java future only offers us blocking and polling as the interface */
def toIO[F[_], A](fa: F[JFuture[A]], pollInterval: FiniteDuration)(
implicit F: Sync[F],
timer: Timer[F]
): F[A] = {
def loop(jf: JFuture[A]): F[A] =
F.delay(jf.isDone).flatMap { isDone =>
// Not calling blocking(jf.get) here because that could cause a thread hop, and isDone==true so this should be very fast to return.
if (isDone) F.delay(jf.get)
else timer.sleep(pollInterval) *> loop(jf)
}
fa.flatMap(loop)
}
/** Convert a java future to an effectful value, blocking (on an async thread) to get the result.
*
* This blocks the thread the ExecutorContext runs on, but it does inform the EC to use BlockContext
*
* Java future only offers us blocking and polling as the interface */
def toIO[F[_], A](fa: F[JFuture[A]])(
implicit F: Sync[F],
timer: Timer[F]
): F[A] =
fa.flatMap { jf =>
timer.shift *> F.delay(blocking(jf.get))
}
}
package teikametrics
import java.util.concurrent.FutureTask
import cats.effect.IO
import org.scalatest.{FlatSpec, Matchers}
import teikametrics.JavaFuture._
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
class JavaFutureTest extends FlatSpec with Matchers {
implicit def ec: ExecutionContext = global
def jfio[A](jf: => JFuture[A]): IO[A] = toIO[IO, A](IO(jf), 10.milliseconds)
behavior of "JavaFuture.toIO"
it should "return the value" in {
val io = jfio {
val t = new FutureTask[Int](() => 1)
t.run()
t
}
io.unsafeRunSync() shouldEqual 1
}
it should "not stack overflow" in {
val io = jfio { new FutureTask[Int](() => 1) /* not running */ }
io.unsafeRunTimed(1.second) shouldBe None
}
it should "propagate errors" in {
val io = jfio {
val t = new FutureTask[Int](() => throw new Exception("foo"))
t.run()
t
}
io.attempt.unsafeRunSync() shouldBe 'left
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment