Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active December 11, 2015 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/4559994 to your computer and use it in GitHub Desktop.
Save dacr/4559994 to your computer and use it in GitHub Desktop.
scala 2.10.0, script example to illustrate how to get futures results as soon as they become available.
#!/bin/sh
exec scala -nocompdaemon -usejavacp -savecompiled "$0" "$@"
!#
/* will output :
END of the script reached
OK - slept 1s
OK - slept 2s
OK - slept 4s
OK - slept 8s
NOK - Error raised
NOK - Error raised
OK - slept 15s
*/
import scala.concurrent._
import scala.concurrent.duration._
import duration._
import scala.util._
import scala.annotation.tailrec
/**
* "Select" off the first future to be satisfied. Return this as a
* result, with the remainder of the Futures as a sequence.
*
* @param fs a scala.collection.Seq
* @author Victor Klang (https://gist.github.com/4488970)
*/
def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = {
@tailrec
def stripe(p: Promise[(Try[A], Seq[Future[A]])],
heads: Seq[Future[A]],
elem: Future[A],
tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = {
elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) }
if (tail.isEmpty) p.future
else stripe(p, heads :+ elem, tail.head, tail.tail)
}
if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!"))
else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail)
}
/**
* apply a function on a sequence of futures as soon as a future completes
*
* @param futures sequences of futures
* @param whatToDo function to apply on each result as soon as it becomes available
* @author David Crosson
*/
def asapFuturesProcess[T](futures: Seq[Future[T]])(whatToDo: (Try[T])=>Unit) {
if (!futures.isEmpty) {
val nextOne = select(futures)
nextOne.onComplete { result =>
result match {
case Success((result, remains)) =>
whatToDo(result)
asapFuturesProcess(remains)(whatToDo)
case Failure(ex) =>
whatToDo(Failure[T](ex))
}
}
}
}
// NOW Let's test it
import java.util.concurrent.{Executors,ThreadPoolExecutor,TimeUnit}
implicit val customEC = ExecutionContext.fromExecutorService(
Executors.newCachedThreadPool() match {
case e:ThreadPoolExecutor =>
//Too allow a quick exit from this script
// because default value is 60s
e.setKeepAliveTime(1, TimeUnit.SECONDS)
e
case x => x
}
)
// A sleep method that simulates errors
def sleep(inS: Int):String = {
Thread.sleep(inS * 1000)
if (inS > 10 && inS < 15) throw new RuntimeException("Error raised")
s"slept ${inS}s"
}
val workToDo = List(4, 11, 2, 8, 1, 13, 15)
val workToDoFutures = workToDo.map { t => future { sleep(t) } }
asapFuturesProcess(workToDoFutures) { tt =>
tt match {
case Success(msg) => println(s"OK - ${msg}")
case Failure(err) => println(s"NOK - ${err.getMessage}")
}
}
println("END of the script reached")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment