Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
examples of common concurrency patterns that you can achieve with the scala.concurrent package
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random
import java.util.{Timer, TimerTask}
object Util {
sealed trait BaseResponse
case class Response1(res: Int) extends BaseResponse
case class Response2(res: String) extends BaseResponse
private val timeoutTimer = {
//in real life, use an Akka scheduler or a Netty HashWheelTimer, because they are more thread-efficient
new Timer()
}
def timeoutFuture[T](timeout: Duration): Future[T] = {
val prom = Promise[T]
val timeoutTask = new TimerTask {
override def run = {
prom.failure(new TimeoutException(s"timed out after $timeout"))
}
}
timeoutTimer.schedule(timeoutTask, timeout.toMillis)
prom.future
}
private val random = new Random(System.currentTimeMillis())
private val possibleSleepMS = List(100L, 200L, 150L, 250L)
//creates a future that sleeps for a pseudo-random amount of time, and then executes t.
//both actions happen on a dedicated thread (because we imported scala.concurrent.ExecutionContext.Implicits.global)
def sleepAndThen[T](t: => T): Future[T] = {
Future {
val sleepLengthIdx = math.abs(random.nextInt) % possibleSleepMS.length
val sleepLength = possibleSleepMS(sleepLengthIdx)
Thread.sleep(sleepLength)
t
}
}
}
//fan out/fan in pattern:
//execute a bunch of independent background jobs, each in their own future.
//merge them into one future that will be complete when the slowest one is.
def fanOut = {
import Util._
val f1 = sleepAndThen(Response1(1))
val f2 = sleepAndThen(Response2("one"))
val f3 = sleepAndThen(Response1(123))
val fannedOutFutures: List[Future[BaseResponse]] = f1 :: f2 :: f3 :: Nil
val results: Future[List[BaseResponse]] = Future.sequence(fannedOutFutures)
results.map { resultList: List[BaseResponse] =>
//after all the results come back, we can do stuff with them
}
}
//timeout pattern
//execute a first request and try again if it fails
def timeout = {
import Util._
val request = sleepAndThen(Response1(1))
val timeout = timeoutFuture[Response1](1.second)
val respFuture = Future.firstCompletedOf(request :: timeout :: Nil)
respFuture.map { resp =>
//only executes if the request didn't time out
}
}
//similar to timeouts - cancelling
//if you have an expensive operation, or maybe even one that runs forever,
//you can cancel it so it doesn't take up resources after we don't need it
def cancel = {
import Util._
//the variable we set to start the cancellation process
val cancelVar = Promise[Unit]()
//the variable that the cancelee sets to ack that it's fully done
val cancelledVar = Promise[Unit]()
//the handler for each request in our simple server
def acceptRequest = ()
//start the server
Future {
while(!cancelVar.isCompleted) {
acceptRequest
}
cancelledVar.success(())
}
//stop the server after a while
sleepAndThen {
cancelVar.success(())
while(!cancelledVar.isCompleted) {
println("still waiting for the server to finish")
}
}
}
//firstCompleted
//execute the same request on multiple servers, and return the first that comes back.
//combined with timeouts and cancellation you can tightly control your call
def firstCompleted = {
import Util._
def makeRequest = sleepAndThen(Response1(1))
val req1 = makeRequest
val req2 = makeRequest
val req3 = makeRequest
val respFuture: Future[Response1] = Future.firstCompletedOf(req1 :: req2 :: req3 :: Nil)
respFuture.map { resp =>
//continue with the response from the fastest server
}
}
//channels
//allow *asynchronous* bidirectional communication between two concurrently executing processes.
//scala.concurrency has a synchronous version of this
def channels = {
import Util._
//a class that lets you send one message. you can compose these into a queue to send multiple messages
class OneTimeChannel[T] {
private val sendProm = Promise[T]()
private val recvProm = Promise[Unit]()
def send(t: T): Future[Unit] = {
sendProm.success(t)
recvProm.future
}
def recv: Future[T] = {
sendProm.future.map { t =>
recvProm.success(())
t
}
}
}
val chan = new OneTimeChannel[Int]
//sender
sleepAndThen {
val sent: Future[Unit] = chan.send(123)
sent.map { _ =>
//do stuff after we know the value has been sent
}
}
//receiver
sleepAndThen {
val receivedFuture: Future[Int] = chan.recv
receivedFuture.map { received =>
//do stuff with the received value
}
}
}
@arschles
Copy link
Author

arschles commented Oct 4, 2013

@wpalmeri
Copy link

wpalmeri commented Oct 5, 2013

your timeoutFuture fails if the timeout is reached. for a timer that just returns Unit upon the timeout being reached, see Futures.alarm https://github.com/scala/scala/blob/v2.10.2/src/actors/scala/actors/Future.scala#L131

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment