Create a gist now

Instantly share code, notes, and snippets.

Future select
package object futurism {
import scala.language.higherKinds
import scala.collection.generic.CanBuildFrom
import scala.collection.{TraversableOnce => Once}
import scala.concurrent._
import scala.util.{Try, Success, Failure}
import scala.util.control.NonFatal
// TraversableOnce.filter results in an iterator that !isTraversableAgain
implicit class NaiveSelectable(val co: Future.type) extends AnyVal {
def select[A](fs: Once[Future[A]])(implicit xc: ExecutionContext): Future[(Try[A], Once[Future[A]])] = {
val p = Promise[(Try[A], Once[Future[A]])]
def finishing(f: Future[A])(v: Try[A]) { if (!p.isCompleted) p trySuccess ((v, fs filter (_ ne f))) }
fs foreach (f => f onComplete finishing(f) _)
println("Selectably...")
p.future
}
}
implicit class Selectable(val co: Future.type) extends AnyVal {
def select[A, B <: Future[A], M[+B] <: TraversableOnce[B]](fs: M[Future[A]])
(implicit cbf: CanBuildFrom[M[B],B,M[B]], xc: ExecutionContext):
Future[(Try[A], M[Future[A]])] = {
val p = Promise[(Try[A], M[B])]()
def finishing(f: Future[A])(v: Try[A]) {
if (!p.isCompleted) p tryComplete {
val b = cbf()
for (x <- fs; if x ne f) b += x.asInstanceOf[B]
Success((v, b.result))
}
}
fs foreach (f => f onComplete finishing(f) _)
println("Selectably...")
p.future
}
}
/** "Select" off the first future to be satisfied. Return this as a
* result, with the remainder of the Futures as a sequence.
*
* @param fs a scala.collection.Seq
*/
implicit class Klangable(val co: Future.type) /*extends AnyVal*/ {
import scala.annotation._
def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = {
@tailrec def stripe(p: Promise[(Try[A], Seq[Future[A]])],
heads: Seq[Future[A]],
elem: Future[A],
tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = {
elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) }
if (tail.isEmpty) p.future
else stripe(p, heads :+ elem, tail.head, tail.tail)
}
println("Klang, klang, klang, went the trolley.")
if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!"))
else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail)
}
}
implicit class EitherFuture[A](val f: Future[A]) extends AnyVal {
def packaged(implicit xc: ExecutionContext): Future[Either[Throwable, A]] = {
val p = Promise[Either[Throwable, A]]
f.onComplete{
case res =>
try {
res match {
case Failure(t) => p success Left(t)
case Success(v) => p success Right(v)
}
} catch {
case NonFatal(t) => p failure t
}
}(xc)
p.future
}
}
}
package futuring {
import futurism.Selectable
//import futurism.Klangable
import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object Test extends App {
val fs = List(future(5), future(17), future(11))
val (res, rest) = Await.result(Future select fs, Duration.Inf)
println(s"result $res, rest $rest")
val all = Await.result(Future.sequence(rest), Duration.Inf)
println(all mkString ",")
}
object Crosson extends App {
import scala.util.{Try, Success, Failure}
import java.util.concurrent.{CountDownLatch => Latch}
def sleepily(inS: Int) = future {
try Thread.sleep(inS * 1000)
catch {
case _: InterruptedException => println(s"Interrupted conjuring a $inS")
}
inS
}
val workToDo = List(4, 10, 2, 8, 1)
val workToDoFutures = workToDo map sleepily
val done = new Latch(workToDo.size)
//def getResults[T](futures:TraversableOnce[Future[T]]) {
def getResults[T](futures: Seq[Future[T]]) {
def finish() = done.countDown()
val nextOne = Future select futures
nextOne onComplete {
case Success((result, remains)) =>
println(s"just for visual debug purposes ${result}")
if (remains.size > 0) getResults(remains)
finish()
case Failure(e) => e.printStackTrace()
finish()
}
}
getResults(workToDoFutures)
done.await(/*timeout*/)
}
object Crosson0 extends App {
import scala.util.{Try, Success, Failure}
import java.util.concurrent.{CountDownLatch => Latch}
def sleep(inS: Int) = { Thread.sleep(inS * 1000); inS }
val workToDo = List(4, 10, 2, 8, 1)
val workToDoFutures = workToDo.map { t => future { sleep(t) } }
def getResults[T](futures:TraversableOnce[Future[T]]) {
val nextOne = Future select futures
nextOne.onComplete { result =>
result match {
case Success((result, remains)) =>
println(s"just for visual debug purposes ${result}")
getResults(remains)
case Failure(ex) => ex.printStackTrace()
}
}
}
getResults(workToDoFutures)
}
}
@viktorklang

Selectable unfortunately subtly breaks the contract.

import futurism.Selectable
val f = Future successful "foo"
val fs = Vector.fill(100)(f)
Future select fs map println // What do you expect to get printed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment