Skip to content

Instantly share code, notes, and snippets.

@nartamonov
Created July 13, 2016 12:57
Show Gist options
  • Save nartamonov/bce0171a6eab0de32e610c48a7c74f75 to your computer and use it in GitHub Desktop.
Save nartamonov/bce0171a6eab0de32e610c48a7c74f75 to your computer and use it in GitHub Desktop.
Простая реализация монады Future и операций из стандартной библиотеки Scala
import java.util.concurrent.{Executor, Executors}
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
object Future {
def apply[T](expression: => T)(implicit executor: Executor) = new FutureImpl(expression)
}
trait Future[T] {
self =>
def onComplete(callback: Try[T] => Unit)(implicit executor: Executor): Unit
def map[R](f: T => R): Future[R] = new Future[R] {
def onComplete(callback: Try[R] => Unit)(implicit executor: Executor): Unit =
self.onComplete {
res => callback(res.map(f))
}
}
def flatMap[R](f: T => Future[R]): Future[R] = new Future[R] {
def onComplete(callback: (Try[R]) => Unit)(implicit executor: Executor): Unit =
self.onComplete {
case Success(v) => f(v).onComplete(callback)
case Failure(e) => callback(Failure(e))
}
}
def fallbackTo(f: => Future[T]): Future[T] = new Future[T] {
def onComplete(callback: (Try[T]) => Unit)(implicit executor: Executor): Unit = {
self.onComplete {
case s@Success(_) => callback(s)
case e@Failure(_) => f.onComplete {
case s@Success(_) => callback(s)
case Failure(_) => callback(e)
}
}
}
}
}
final class FutureImpl[T](expression: => T)(implicit executor: Executor) extends Future[T] {
private case class State(value: Option[Try[T]], completionListeners: Vector[Try[T] => Unit])
private var state = new AtomicReference[State](State(None, Vector.empty))
runAsync()
def isCompleted: Boolean = state.get().value.isDefined
def onComplete(callback: Try[T] => Unit)(implicit executor: Executor): Unit = {
val guardedCallback = (x: Try[T]) =>
execute(try { callback(x) } catch { case NonFatal(e) => e.printStackTrace() })
getAndTransform {
case s@State(Some(v), _) =>
guardedCallback(v)
s
case s@State(None, completionListeners) =>
State(None, completionListeners :+ guardedCallback)
}
}
private def runAsync(): Unit = {
execute {
val v = Try(expression)
getAndTransform(s => s.copy(value = Some(v)))
for (callback <- state.get().completionListeners)
callback(v)
}
}
// См. https://alexn.org/blog/2013/05/07/towards-better-atomicreference-scala.html
@tailrec
private def getAndTransform(f: State => State): State = {
val oldValue = state.get()
val newValue = f(oldValue)
if (!state.compareAndSet(oldValue, newValue))
getAndTransform(f)
else
oldValue
}
protected def execute[R](expression: => R)(implicit executor: Executor): Unit = {
executor.execute(new Runnable {
override def run(): Unit = expression
})
}
}
def retry[T](times: Int)(asyncComp: => Future[T]): Future[T] = {
require(times >= 1)
Stream.iterate(asyncComp)(_ fallbackTo asyncComp).take(times).last
}
// TESTING
implicit val executor = Executors.newSingleThreadExecutor()
val c = for {
a <- Future(10)
b <- Future(a + 20)
} yield b
c.onComplete(println) // ==> 30
Thread.sleep(500)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment