Skip to content

@som-snytt /gist:4504281
Last active

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Future select
package object futurism {
import scala.language.higherKinds
import scala.collection.generic.CanBuildFrom
import scala.collection.{TraversableOnce => Once}
import scala.concurrent._
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal
// TraversableOnce.filter results in an iterator that !isTraversableAgain
implicit class NaiveSelectable(val co: Future.type) extends AnyVal {
def select[A](fs: Once[Future[A]])(implicit xc: ExecutionContext): Future[(Try[A], Once[Future[A]])] = {
val p = Promise[(Try[A], Once[Future[A]])]
def finishing(f: Future[A])(v: Try[A]) { if (!p.isCompleted) p trySuccess ((v, fs filter (_ ne f))) }
fs foreach (f => f onComplete finishing(f) _)
println("Selectably...")
p.future
}
}
implicit class Selectable(val co: Future.type) extends AnyVal {
def select[A, B <: Future[A], M[+B] <: TraversableOnce[B]](fs: M[Future[A]])
(implicit cbf: CanBuildFrom[M[B],B,M[B]], xc: ExecutionContext):
Future[(Try[A], M[Future[A]])] = {
val p = Promise[(Try[A], M[B])]()
def finishing(f: Future[A])(v: Try[A]) {
if (!p.isCompleted) p tryComplete {
val b = cbf()
for (x <- fs; if x ne f) b += x.asInstanceOf[B]
Success((v, b.result))
}
}
fs foreach (f => f onComplete finishing(f) _)
println("Selectably...")
p.future
}
}
/** "Select" off the first future to be satisfied. Return this as a
* result, with the remainder of the Futures as a sequence.
*
* @param fs a scala.collection.Seq
*/
implicit class Klangable(val co: Future.type) /*extends AnyVal*/ {
import scala.annotation._
def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = {
@tailrec def stripe(p: Promise[(Try[A], Seq[Future[A]])],
heads: Seq[Future[A]],
elem: Future[A],
tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = {
elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) }
if (tail.isEmpty) p.future
else stripe(p, heads :+ elem, tail.head, tail.tail)
}
println("Klang, klang, klang, went the trolley.")
if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!"))
else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail)
}
}
implicit class EitherFuture[A](val f: Future[A]) extends AnyVal {
def packaged(implicit xc: ExecutionContext): Future[Either[Throwable, A]] = {
val p = Promise[Either[Throwable, A]]
f.onComplete{
case res =>
try {
res match {
case Failure(t) => p success Left(t)
case Success(v) => p success Right(v)
}
} catch {
case NonFatal(t) => p failure t
}
}(xc)
p.future
}
}
}
package futuring {
import futurism.Selectable
//import futurism.Klangable
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object Test extends App {
val fs = List(future(5), future(17), future(11))
val (res, rest) = Await.result(Future select fs, Duration.Inf)
println(s"result $res, rest $rest")
val all = Await.result(Future.sequence(rest), Duration.Inf)
println(all mkString ",")
}
object Crosson extends App {
import scala.util.{Try, Success, Failure}
import java.util.concurrent.{CountDownLatch => Latch}
def sleepily(inS: Int) = future {
try Thread.sleep(inS * 1000)
catch {
case _: InterruptedException => println(s"Interrupted conjuring a $inS")
}
inS
}
val workToDo = List(4, 10, 2, 8, 1)
val workToDoFutures = workToDo map sleepily
val done = new Latch(workToDo.size)
//def getResults[T](futures:TraversableOnce[Future[T]]) {
def getResults[T](futures: Seq[Future[T]]) {
def finish() = done.countDown()
val nextOne = Future select futures
nextOne onComplete {
case Success((result, remains)) =>
println(s"just for visual debug purposes ${result}")
if (remains.size > 0) getResults(remains)
finish()
case Failure(e) => e.printStackTrace()
finish()
}
}
getResults(workToDoFutures)
done.await(/*timeout*/)
}
object Crosson0 extends App {
import scala.util.{Try, Success, Failure}
import java.util.concurrent.{CountDownLatch => Latch}
def sleep(inS: Int) = { Thread.sleep(inS * 1000); inS }
val workToDo = List(4, 10, 2, 8, 1)
val workToDoFutures = workToDo.map { t => future { sleep(t) } }
def getResults[T](futures:TraversableOnce[Future[T]]) {
val nextOne = Future select futures
nextOne.onComplete { result =>
result match {
case Success((result, remains)) =>
println(s"just for visual debug purposes ${result}")
getResults(remains)
case Failure(ex) => ex.printStackTrace()
}
}
}
getResults(workToDoFutures)
}
}
@viktorklang

Selectable unfortunately subtly breaks the contract.

import futurism.Selectable
val f = Future successful "foo"
val fs = Vector.fill(100)(f)
Future select fs map println // What do you expect to get printed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.