Skip to content

Instantly share code, notes, and snippets.

@anthonynsimon
Created February 9, 2018 16:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anthonynsimon/d35cd7f5d145c5b85963666d4ee5dc4c to your computer and use it in GitHub Desktop.
Save anthonynsimon/d35cd7f5d145c5b85963666d4ee5dc4c to your computer and use it in GitHub Desktop.
Concurrency
import java.util.concurrent.atomic.AtomicReference
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
trait Eventual[A] {
def get: Try[A]
def onComplete(f: Try[A] => Unit): Unit
def map[B](f: A => B): Eventual[B]
def flatMap[B](f: A => Eventual[B]): Eventual[B]
def isCompleted: Boolean
}
class Promise[A] extends Eventual[A] {
private val value: AtomicReference[Option[Try[A]]] = new AtomicReference(None)
def get: Try[A] = value.get() match {
case Some(x) => x
case _ => Failure(new Exception("Not completed, but tried to pull"))
}
private val callbacks = mutable.ArrayBuffer[Try[A] => Unit]()
def onComplete(f: Try[A] => Unit): Unit = {
callbacks += f
value.get().foreach(f)
}
def isCompleted: Boolean = value.get().isDefined
def complete(a: Try[A]): Unit = {
value.set(Some(a))
callbacks.foreach(f => f(a))
}
def map[B](f: A => B): Eventual[B] = flatMap(x => Eventual.fromTry(Try(f(x))))
def flatMap[B](f: A => Eventual[B]): Eventual[B] = {
val p = new Promise[B]
this.onComplete {
case Success(x) => Try(f(x)) match {
case Success(y) => y.onComplete(z => p.complete(z))
case Failure(err) => p.complete(Try(throw err))
}
case Failure(err) => p.complete(Try(throw err))
}
p.eventual
}
def eventual: Eventual[A] = this
}
object Eventual {
def successful[A](a: A): Eventual[A] = {
val p = new Promise[A]
p.complete(Success(a))
p
}
def failed[A](err: Throwable): Eventual[A] = {
val p = new Promise[A]
p.complete(Failure(err))
p
}
def unit[A](f: () => A)(implicit executionContext: ExecutionContext): Eventual[A] = {
val p = new Promise[A]
executionContext.execute(() => {
p.complete(Try(f()))
})
p
}
def fromTry[A](f: Try[A]): Eventual[A] = {
f match {
case Success(x) => Eventual.successful(x)
case Failure(err) => Eventual.failed(err)
}
}
def ready(eventual: Eventual[_]): Unit = while (!eventual.isCompleted) {}
def ready(eventuals: Seq[Eventual[_]]): Unit = while (!eventuals.forall(_.isCompleted)) {}
def first[A](eventuals: Seq[Eventual[A]]): Eventual[A] = {
var done: Option[Promise[A]] = None
while (done.isEmpty) {
done = eventuals.collectFirst {
case e: Promise[A] if e.isCompleted => e
}
}
done.get
}
}
object Main extends App {
implicit val ec = ExecutionContext.global
val start = System.currentTimeMillis()
val p1 =
Eventual
.unit(() => 17)
.map(x => x)
.flatMap(x => {
if (true) throw new Exception("short circuit exception")
Eventual.successful(x)
})
.map(x => x + 100000)
val p2 =
Eventual
.unit(() => {
Thread.sleep(3000)
4
})
.map(x => {
x + 10
})
.flatMap(x => Eventual.successful(100 + x))
p1.onComplete(x => println(s"completed: $x"))
p2.onComplete(x => println(s"completed: $x"))
val first = Eventual.first(Seq(p1, p2))
first.onComplete(x => println(s"First result: $x"))
Eventual.ready(Seq(p1, p2))
println(s"Done after ${System.currentTimeMillis() - start} ms")
println(p1.get, p2.get)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment