Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
examples of common concurrency patterns that you can achieve with the scala.concurrent package
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.Random
import java.util.{Timer, TimerTask}
object Util {
sealed trait BaseResponse
case class Response1(res: Int) extends BaseResponse
case class Response2(res: String) extends BaseResponse
private val timeoutTimer = {
//in real life, use an Akka scheduler or a Netty HashWheelTimer, because they are more thread-efficient
new Timer()
def timeoutFuture[T](timeout: Duration): Future[T] = {
val prom = Promise[T]
val timeoutTask = new TimerTask {
override def run = {
prom.failure(new TimeoutException(s"timed out after $timeout"))
timeoutTimer.schedule(timeoutTask, timeout.toMillis)
private val random = new Random(System.currentTimeMillis())
private val possibleSleepMS = List(100L, 200L, 150L, 250L)
//creates a future that sleeps for a pseudo-random amount of time, and then executes t.
//both actions happen on a dedicated thread (because we imported
def sleepAndThen[T](t: => T): Future[T] = {
Future {
val sleepLengthIdx = math.abs(random.nextInt) % possibleSleepMS.length
val sleepLength = possibleSleepMS(sleepLengthIdx)
//fan out/fan in pattern:
//execute a bunch of independent background jobs, each in their own future.
//merge them into one future that will be complete when the slowest one is.
def fanOut = {
import Util._
val f1 = sleepAndThen(Response1(1))
val f2 = sleepAndThen(Response2("one"))
val f3 = sleepAndThen(Response1(123))
val fannedOutFutures: List[Future[BaseResponse]] = f1 :: f2 :: f3 :: Nil
val results: Future[List[BaseResponse]] = Future.sequence(fannedOutFutures) { resultList: List[BaseResponse] =>
//after all the results come back, we can do stuff with them
//timeout pattern
//execute a first request and try again if it fails
def timeout = {
import Util._
val request = sleepAndThen(Response1(1))
val timeout = timeoutFuture[Response1](1.second)
val respFuture = Future.firstCompletedOf(request :: timeout :: Nil) { resp =>
//only executes if the request didn't time out
//similar to timeouts - cancelling
//if you have an expensive operation, or maybe even one that runs forever,
//you can cancel it so it doesn't take up resources after we don't need it
def cancel = {
import Util._
//the variable we set to start the cancellation process
val cancelVar = Promise[Unit]()
//the variable that the cancelee sets to ack that it's fully done
val cancelledVar = Promise[Unit]()
//the handler for each request in our simple server
def acceptRequest = ()
//start the server
Future {
while(!cancelVar.isCompleted) {
//stop the server after a while
sleepAndThen {
while(!cancelledVar.isCompleted) {
println("still waiting for the server to finish")
//execute the same request on multiple servers, and return the first that comes back.
//combined with timeouts and cancellation you can tightly control your call
def firstCompleted = {
import Util._
def makeRequest = sleepAndThen(Response1(1))
val req1 = makeRequest
val req2 = makeRequest
val req3 = makeRequest
val respFuture: Future[Response1] = Future.firstCompletedOf(req1 :: req2 :: req3 :: Nil) { resp =>
//continue with the response from the fastest server
//allow *asynchronous* bidirectional communication between two concurrently executing processes.
//scala.concurrency has a synchronous version of this
def channels = {
import Util._
//a class that lets you send one message. you can compose these into a queue to send multiple messages
class OneTimeChannel[T] {
private val sendProm = Promise[T]()
private val recvProm = Promise[Unit]()
def send(t: T): Future[Unit] = {
def recv: Future[T] = { { t =>
val chan = new OneTimeChannel[Int]
sleepAndThen {
val sent: Future[Unit] = chan.send(123) { _ =>
//do stuff after we know the value has been sent
sleepAndThen {
val receivedFuture: Future[Int] = chan.recv { received =>
//do stuff with the received value
Copy link

arschles commented Oct 4, 2013

Copy link

wpalmeri commented Oct 5, 2013

your timeoutFuture fails if the timeout is reached. for a timer that just returns Unit upon the timeout being reached, see Futures.alarm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment