Last active
December 11, 2015 06:29
-
-
Save dacr/4559994 to your computer and use it in GitHub Desktop.
scala 2.10.0, script example to illustrate how to get futures results as soon as they become available.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/sh | |
exec scala -nocompdaemon -usejavacp -savecompiled "$0" "$@" | |
!# | |
/* will output : | |
END of the script reached | |
OK - slept 1s | |
OK - slept 2s | |
OK - slept 4s | |
OK - slept 8s | |
NOK - Error raised | |
NOK - Error raised | |
OK - slept 15s | |
*/ | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import duration._ | |
import scala.util._ | |
import scala.annotation.tailrec | |
/** | |
* "Select" off the first future to be satisfied. Return this as a | |
* result, with the remainder of the Futures as a sequence. | |
* | |
* @param fs a scala.collection.Seq | |
* @author Victor Klang (https://gist.github.com/4488970) | |
*/ | |
def select[A](fs: Seq[Future[A]])(implicit ec: ExecutionContext): Future[(Try[A], Seq[Future[A]])] = { | |
@tailrec | |
def stripe(p: Promise[(Try[A], Seq[Future[A]])], | |
heads: Seq[Future[A]], | |
elem: Future[A], | |
tail: Seq[Future[A]]): Future[(Try[A], Seq[Future[A]])] = { | |
elem onComplete { res => if (!p.isCompleted) p.trySuccess((res, heads ++ tail)) } | |
if (tail.isEmpty) p.future | |
else stripe(p, heads :+ elem, tail.head, tail.tail) | |
} | |
if (fs.isEmpty) Future.failed(new IllegalArgumentException("empty future list!")) | |
else stripe(Promise(), fs.genericBuilder[Future[A]].result, fs.head, fs.tail) | |
} | |
/** | |
* apply a function on a sequence of futures as soon as a future completes | |
* | |
* @param futures sequences of futures | |
* @param whatToDo function to apply on each result as soon as it becomes available | |
* @author David Crosson | |
*/ | |
def asapFuturesProcess[T](futures: Seq[Future[T]])(whatToDo: (Try[T])=>Unit) { | |
if (!futures.isEmpty) { | |
val nextOne = select(futures) | |
nextOne.onComplete { result => | |
result match { | |
case Success((result, remains)) => | |
whatToDo(result) | |
asapFuturesProcess(remains)(whatToDo) | |
case Failure(ex) => | |
whatToDo(Failure[T](ex)) | |
} | |
} | |
} | |
} | |
// NOW Let's test it | |
import java.util.concurrent.{Executors,ThreadPoolExecutor,TimeUnit} | |
implicit val customEC = ExecutionContext.fromExecutorService( | |
Executors.newCachedThreadPool() match { | |
case e:ThreadPoolExecutor => | |
//Too allow a quick exit from this script | |
// because default value is 60s | |
e.setKeepAliveTime(1, TimeUnit.SECONDS) | |
e | |
case x => x | |
} | |
) | |
// A sleep method that simulates errors | |
def sleep(inS: Int):String = { | |
Thread.sleep(inS * 1000) | |
if (inS > 10 && inS < 15) throw new RuntimeException("Error raised") | |
s"slept ${inS}s" | |
} | |
val workToDo = List(4, 11, 2, 8, 1, 13, 15) | |
val workToDoFutures = workToDo.map { t => future { sleep(t) } } | |
asapFuturesProcess(workToDoFutures) { tt => | |
tt match { | |
case Success(msg) => println(s"OK - ${msg}") | |
case Failure(err) => println(s"NOK - ${err.getMessage}") | |
} | |
} | |
println("END of the script reached") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment