Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Last active August 26, 2021 13:32
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Daenyth/47a19a8f632e00a136f8647f3d9b5994 to your computer and use it in GitHub Desktop.
Save Daenyth/47a19a8f632e00a136f8647f3d9b5994 to your computer and use it in GitHub Desktop.
Convert Java futures to cats-effect IO / cats-effect 1.0
package teikametrics
import cats.effect.implicits._
import cats.effect.{ContextShift, ExitCase, Sync, Timer}
import cats.syntax.all._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
object JavaFuture {
// Alias for more friendly imports
type JFuture[A] = java.util.concurrent.Future[A]
/** Convert a java future to an effectful value, using async sleep polling to get the result.
*
* Because this uses async sleep, it won't block a thread.
*
* Java future only offers us blocking and polling as the interface
*
* @param mayInterruptThreadOnCancel This is passed as an argument to [[JFuture#cancel]]
* in the case the returned `F[A]` is cancelled.
*/
def toIO[F[_], A](
fa: F[JFuture[A]],
pollInterval: FiniteDuration,
mayInterruptThreadOnCancel: Boolean = true
)(
implicit F: Sync[F],
timer: Timer[F]
): F[A] = {
def loop(jf: JFuture[A]): F[A] =
F.delay(jf.isDone).flatMap { isDone =>
// Not ContextShift.evalOn(blockingPool) here because isDone==true, so this should be very fast to return.
if (isDone) F.delay(jf.get)
else timer.sleep(pollInterval) *> loop(jf)
}
fa.flatMap { jf =>
loop(jf)
.guaranteeCase {
case ExitCase.Canceled =>
F.delay(jf.cancel(mayInterruptThreadOnCancel)).void
case _ => F.unit
}
}
}
/** Convert a java future to an effectful value, blocking a thread on the specified ExecutionContext
* to get the result.
* Java future only offers us blocking and polling as the interface */
def toIO[F[_], A](
fa: F[JFuture[A]],
blockingExecutionContext: ExecutionContext
)(
implicit F: Sync[F],
CS: ContextShift[F]
): F[A] =
fa.flatMap { jf =>
CS.evalOn(blockingExecutionContext)(F.delay(jf.get))
}
}
package teikametrics
import java.util.concurrent.FutureTask
import cats.effect.IO
import org.scalatest.{FlatSpec, Matchers}
import teikametrics.JavaFuture._
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.global
import scala.concurrent.duration._
class JavaFutureTest extends FlatSpec with Matchers with ContextShiftTest {
implicit def ec: ExecutionContext = global
def jfio[A](jf: => JFuture[A]): IO[A] = toIO[IO, A](IO(jf), 10.milliseconds)
behavior of "JavaFuture.toIO"
it should "return the value" in {
val io = jfio {
val t = new FutureTask[Int](() => 1)
t.run()
t
}
io.unsafeRunSync() shouldEqual 1
}
it should "not stack overflow" in {
val io = jfio { new FutureTask[Int](() => 1) /* not running */ }
io.unsafeRunTimed(1.second) shouldBe None
}
it should "propagate errors" in {
val io = jfio {
val t = new FutureTask[Int](() => throw new Exception("foo"))
t.run()
t
}
io.attempt.unsafeRunSync() shouldBe 'left
}
}
@realvictorprm
Copy link

Can you add a license to this 😄 ?

@Daenyth
Copy link
Author

Daenyth commented Aug 22, 2020

I hereby license this post under MIT license. Go nuts :)

@realvictorprm
Copy link

Many thanks! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment