public
Last active

Future select

  • Download Gist
gistfile1.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
package object futurism {
import scala.language.higherKinds
import scala.collection.generic.CanBuildFrom
import scala.collection.{TraversableOnce => Once}
import scala.concurrent._
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal
 
// TraversableOnce.filter results in an iterator that !isTraversableAgain
implicit class NaiveSelectable(val co: Future.type) extends AnyVal {
def select[A](fs: Once[Future[A]])(implicit xc: ExecutionContext): Future[(Try[A], Once[Future[A]])] = {
val p = Promise[(Try[A], Once[Future[A]])]
def finishing(f: Future[A])(v: Try[A]) { if (!p.isCompleted) p trySuccess ((v, fs filter (_ ne f))) }
fs foreach (f => f onComplete finishing(f) _)
println("Selectably...")
p.future
}
}
implicit class Selectable(val co: Future.type) extends AnyVal {
def select[A, B <: Future[A], M[+B] <: TraversableOnce[B]](fs: M[Future[A]])
(implicit cbf: CanBuildFrom[M[B],B,M[B]], xc: ExecutionContext):
Future[(Try[A], M[Future[A]])] = {
val p = Promise[(Try[A], M[B])]()
def finishing(f: Future[A])(v: Try[A]) {
if (!p.isCompleted) p tryComplete {
val b = cbf()
for (x <- fs; if x ne f) b += x.asInstanceOf[B]
Success((v, b.result))
}
}
fs foreach (f => f onComplete finishing(f) _)
println("Selectably...")
p.future
}
}
/** "Select" off the first future to be satisfied. Return this as a
* result, with the remainder of the Futures as a sequence.
*
* @param fs a scala.collection.Seq
*/
implicit class Klangable(val co: Future.type) /*extends AnyVal*/ {
import scala.annotation._
def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = {
@tailrec def stripe(p: Promise[(Try[A], Seq[Future[A]])],
heads: Seq[Future[A]],
elem: Future[A],
tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = {
elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) }
if (tail.isEmpty) p.future
else stripe(p, heads :+ elem, tail.head, tail.tail)
}
 
println("Klang, klang, klang, went the trolley.")
if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!"))
else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail)
}
}
 
implicit class EitherFuture[A](val f: Future[A]) extends AnyVal {
def packaged(implicit xc: ExecutionContext): Future[Either[Throwable, A]] = {
val p = Promise[Either[Throwable, A]]
f.onComplete{
case res =>
try {
res match {
case Failure(t) => p success Left(t)
case Success(v) => p success Right(v)
}
} catch {
case NonFatal(t) => p failure t
}
}(xc)
p.future
}
}
}
package futuring {
import futurism.Selectable
//import futurism.Klangable
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object Test extends App {
val fs = List(future(5), future(17), future(11))
val (res, rest) = Await.result(Future select fs, Duration.Inf)
println(s"result $res, rest $rest")
val all = Await.result(Future.sequence(rest), Duration.Inf)
println(all mkString ",")
}
object Crosson extends App {
import scala.util.{Try, Success, Failure}
import java.util.concurrent.{CountDownLatch => Latch}
 
def sleepily(inS: Int) = future {
try Thread.sleep(inS * 1000)
catch {
case _: InterruptedException => println(s"Interrupted conjuring a $inS")
}
inS
}
val workToDo = List(4, 10, 2, 8, 1)
val workToDoFutures = workToDo map sleepily
val done = new Latch(workToDo.size)
 
//def getResults[T](futures:TraversableOnce[Future[T]]) {
def getResults[T](futures: Seq[Future[T]]) {
def finish() = done.countDown()
val nextOne = Future select futures
nextOne onComplete {
case Success((result, remains)) =>
println(s"just for visual debug purposes ${result}")
if (remains.size > 0) getResults(remains)
finish()
case Failure(e) => e.printStackTrace()
finish()
}
}
getResults(workToDoFutures)
done.await(/*timeout*/)
}
object Crosson0 extends App {
import scala.util.{Try, Success, Failure}
import java.util.concurrent.{CountDownLatch => Latch}
def sleep(inS: Int) = { Thread.sleep(inS * 1000); inS }
val workToDo = List(4, 10, 2, 8, 1)
val workToDoFutures = workToDo.map { t => future { sleep(t) } }
 
def getResults[T](futures:TraversableOnce[Future[T]]) {
val nextOne = Future select futures
nextOne.onComplete { result =>
result match {
case Success((result, remains)) =>
println(s"just for visual debug purposes ${result}")
getResults(remains)
case Failure(ex) => ex.printStackTrace()
}
}
}
getResults(workToDoFutures)
}
}

Selectable unfortunately subtly breaks the contract.

import futurism.Selectable
val f = Future successful "foo"
val fs = Vector.fill(100)(f)
Future select fs map println // What do you expect to get printed

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.