public
Created

Future Composition with Scala and Akka

  • Download Gist
LogSearch.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits._
import scala.util.{ Success, Failure }
import akka.actor._
import akka.pattern.{ after, ask, pipe }
import akka.util.Timeout
 
object LogSearch extends App {
 
println("Starting actor system")
val system = ActorSystem("futures")
 
println("Starting log search")
try {
// timeout for each search task
val fallbackTimeout = 2 seconds
 
// timeout use with akka.patterns.ask
implicit val timeout = new Timeout(5 seconds)
 
require(fallbackTimeout < timeout.duration)
 
// Create SearchActor
val search = system.actorOf(Props[LogSearchActor])
 
// Test worktimes for search
val worktimes = List(1000, 1500, 1200, 800, 2000, 600, 3500, 8000, 250)
 
// Asking for results
val futureResults = (search ? Search(worktimes, fallbackTimeout))
// Cast to correct type
.mapTo[List[String]]
// In case something went wrong
.recover {
case e: TimeoutException => List("timeout")
case e: Exception => List(e getMessage)
}
// Callback (non-blocking)
.onComplete {
case Success(results) =>
println(":: Results ::")
results foreach (r => println(s" $r"))
system shutdown ()
case Failure(t) =>
t printStackTrace ()
system shutdown ()
}
 
} catch {
case t: Throwable =>
t printStackTrace ()
system shutdown ()
}
 
// Await end of programm
system awaitTermination (20 seconds)
}
 
class LogSearchActor extends Actor {
 
def receive = {
case Search(worktimes, timeout) =>
// Doing all the work in one actor using futures
val searchFutures = worktimes map { worktime =>
val searchFuture = search(worktime)
val fallback = after(timeout, context.system.scheduler) { Future successful s"$worktime ms > $timeout" }
Future firstCompletedOf Seq(searchFuture, fallback)
}
 
// Pipe future results to sender
(Future sequence searchFutures) pipeTo sender
case _ =>
}
 
def search(worktime: Int): Future[String] = future {
Thread sleep worktime
s"found something in $worktime ms"
}
}
 
case class Search(worktime: List[Int], timeout: FiniteDuration)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.